""" 测试ODBC连接池 前提条件: 1. 已配置ODBC数据源:YHC-test 2. 数据源可以正常连接到Sybase数据库 """ import sys import os import time from threading import Thread # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) def test_basic_connection(): """测试1: 基本连接""" print("\n" + "=" * 60) print("测试1: 基本连接") print("=" * 60) try: from iti.applications.service.erp import PyODBCConnectionPool # 创建连接池 pool = PyODBCConnectionPool(dsn="YHC-test", pool_size=2) print("[OK] 连接池创建成功") # 获取连接并执行简单查询 with pool.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT 1 as test") result = cursor.fetchone() cursor.close() print(f"[OK] 查询成功: {result}") return True except Exception as e: print(f"[FAIL] 测试失败: {e}") import traceback traceback.print_exc() return False def test_connection_reuse(): """测试2: 连接复用""" print("\n" + "=" * 60) print("测试2: 连接复用") print("=" * 60) try: from iti.applications.service.erp import PyODBCConnectionPool pool = PyODBCConnectionPool(dsn="YHC-test", pool_size=2) # 第一次查询 start = time.time() with pool.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT 1") cursor.fetchone() cursor.close() time1 = time.time() - start print(f"[OK] 第1次查询耗时: {time1*1000:.2f}ms") # 第二次查询(应该复用连接,更快) start = time.time() with pool.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT 1") cursor.fetchone() cursor.close() time2 = time.time() - start print(f"[OK] 第2次查询耗时: {time2*1000:.2f}ms") if time2 < time1: print(f"[OK] 连接复用生效,性能提升 {(time1/time2):.1f}倍") else: print("[INFO] 连接复用正常(时间差异不明显)") return True except Exception as e: print(f"[FAIL] 测试失败: {e}") import traceback traceback.print_exc() return False def test_erp_odbc_manager(): """测试3: ERPODBCManager""" print("\n" + "=" * 60) print("测试3: ERPODBCManager") print("=" * 60) try: from iti.applications.service.erp import ERPODBCManager # 创建管理器 manager = ERPODBCManager() # 模拟Flask配置 class MockApp: config = { "ERP_ODBC_CONFIG": { "dsn": "YHC-test", "pool_size": 3, "max_overflow": 5, } } app = MockApp() manager.init_app(app) print("[OK] ERPODBCManager初始化成功") # 测试execute_query results = manager.execute_query("SELECT 1 as test, 'hello' as msg") print(f"[OK] execute_query成功: {results}") if len(results) == 1 and results[0]['test'] == 1: print("[OK] 查询结果正确") return True except Exception as e: print(f"[FAIL] 测试失败: {e}") import traceback traceback.print_exc() return False def test_real_erp_query(): """测试4: 真实ERP查询""" print("\n" + "=" * 60) print("测试4: 真实ERP查询") print("=" * 60) try: from iti.applications.service.erp import ERPODBCManager manager = ERPODBCManager() class MockApp: config = { "ERP_ODBC_CONFIG": { "dsn": "YHC-test", "pool_size": 3, } } app = MockApp() manager.init_app(app) # 查询工单表(限制1条) sql = """ SELECT TOP 1 OrderNumber FROM monitor.ManufacturingOrder """ results = manager.execute_query(sql) if results: print(f"[OK] 查询到 {len(results)} 条工单") print(f"[OK] 示例数据: {results[0]}") else: print("[INFO] 查询结果为空(可能表中没有数据)") return True except Exception as e: print(f"[FAIL] 测试失败: {e}") print("[INFO] 如果是表不存在的错误,说明连接正常,只是表名不对") import traceback traceback.print_exc() return False def test_concurrent_queries(): """测试5: 并发查询""" print("\n" + "=" * 60) print("测试5: 并发查询(5个线程)") print("=" * 60) try: from iti.applications.service.erp import ERPODBCManager manager = ERPODBCManager() class MockApp: config = { "ERP_ODBC_CONFIG": { "dsn": "YHC-test", "pool_size": 3, "max_overflow": 5, } } app = MockApp() manager.init_app(app) results = [] errors = [] def query_task(thread_id): try: result = manager.execute_query(f"SELECT {thread_id} as thread_id, 'test' as msg") results.append((thread_id, result)) print(f" [线程{thread_id}] 查询成功") except Exception as e: errors.append((thread_id, str(e))) print(f" [线程{thread_id}] 查询失败: {e}") # 创建5个线程并发查询 threads = [] for i in range(5): t = Thread(target=query_task, args=(i+1,)) threads.append(t) t.start() # 等待所有线程完成 for t in threads: t.join() print(f"[OK] 并发查询完成: 成功 {len(results)} 个, 失败 {len(errors)} 个") if len(errors) == 0: print("[OK] 所有并发查询都成功") return True else: print(f"[WARN] 有 {len(errors)} 个查询失败") return False except Exception as e: print(f"[FAIL] 测试失败: {e}") import traceback traceback.print_exc() return False def test_performance(): """测试6: 性能对比""" print("\n" + "=" * 60) print("测试6: 性能对比(连接池 vs 每次新建连接)") print("=" * 60) try: import pyodbc from iti.applications.service.erp import PyODBCConnectionPool # 测试1: 不使用连接池(每次新建连接) print("\n[测试] 不使用连接池(10次查询)...") start = time.time() for i in range(10): conn = pyodbc.connect('DSN=YHC-test') cursor = conn.cursor() cursor.execute("SELECT 1") cursor.fetchone() cursor.close() conn.close() time_without_pool = time.time() - start print(f" 耗时: {time_without_pool:.3f}秒") # 测试2: 使用连接池 print("\n[测试] 使用连接池(10次查询)...") pool = PyODBCConnectionPool(dsn="YHC-test", pool_size=3) start = time.time() for i in range(10): with pool.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT 1") cursor.fetchone() cursor.close() time_with_pool = time.time() - start print(f" 耗时: {time_with_pool:.3f}秒") # 性能提升 improvement = time_without_pool / time_with_pool print(f"\n[结果] 性能提升: {improvement:.1f}倍") if improvement > 2: print("[OK] 连接池性能提升显著") return True else: print("[INFO] 连接池性能提升不明显(可能查询太简单)") return True except Exception as e: print(f"[FAIL] 测试失败: {e}") import traceback traceback.print_exc() return False def main(): """运行所有测试""" print("=" * 60) print("ODBC连接池测试") print("=" * 60) print("\n前提条件:") print("1. 已配置ODBC数据源: YHC-test") print("2. 数据源可以正常连接") tests = [ ("基本连接", test_basic_connection), ("连接复用", test_connection_reuse), ("ERPODBCManager", test_erp_odbc_manager), ("真实ERP查询", test_real_erp_query), ("并发查询", test_concurrent_queries), ("性能对比", test_performance), ] results = [] for name, test_func in tests: try: result = test_func() results.append((name, result)) except Exception as e: print(f"\n[ERROR] 测试 '{name}' 异常: {e}") results.append((name, False)) # 输出结果 print("\n" + "=" * 60) print("测试结果汇总") print("=" * 60) for name, result in results: status = "[OK]" if result else "[FAIL]" print(f"{status} {name}") passed = sum(1 for _, r in results if r) total = len(results) print("\n" + "=" * 60) print(f"总计: {passed}/{total} 通过") print("=" * 60) return passed == total if __name__ == "__main__": success = main() sys.exit(0 if success else 1)