You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-Flask/tests/test_odbc_manager.py

347 lines
9.7 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
测试ODBC连接池
前提条件:
1. 已配置ODBC数据源YHC-test
2. 数据源可以正常连接到Sybase数据库
"""
import sys
import os
import time
from threading import Thread
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def test_basic_connection():
"""测试1: 基本连接"""
print("\n" + "=" * 60)
print("测试1: 基本连接")
print("=" * 60)
try:
from iti.applications.service.erp import PyODBCConnectionPool
# 创建连接池
pool = PyODBCConnectionPool(dsn="YHC-test", pool_size=2)
print("[OK] 连接池创建成功")
# 获取连接并执行简单查询
with pool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1 as test")
result = cursor.fetchone()
cursor.close()
print(f"[OK] 查询成功: {result}")
return True
except Exception as e:
print(f"[FAIL] 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_connection_reuse():
"""测试2: 连接复用"""
print("\n" + "=" * 60)
print("测试2: 连接复用")
print("=" * 60)
try:
from iti.applications.service.erp import PyODBCConnectionPool
pool = PyODBCConnectionPool(dsn="YHC-test", pool_size=2)
# 第一次查询
start = time.time()
with pool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()
cursor.close()
time1 = time.time() - start
print(f"[OK] 第1次查询耗时: {time1*1000:.2f}ms")
# 第二次查询(应该复用连接,更快)
start = time.time()
with pool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()
cursor.close()
time2 = time.time() - start
print(f"[OK] 第2次查询耗时: {time2*1000:.2f}ms")
if time2 < time1:
print(f"[OK] 连接复用生效,性能提升 {(time1/time2):.1f}")
else:
print("[INFO] 连接复用正常(时间差异不明显)")
return True
except Exception as e:
print(f"[FAIL] 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_erp_odbc_manager():
"""测试3: ERPODBCManager"""
print("\n" + "=" * 60)
print("测试3: ERPODBCManager")
print("=" * 60)
try:
from iti.applications.service.erp import ERPODBCManager
# 创建管理器
manager = ERPODBCManager()
# 模拟Flask配置
class MockApp:
config = {
"ERP_ODBC_CONFIG": {
"dsn": "YHC-test",
"pool_size": 3,
"max_overflow": 5,
}
}
app = MockApp()
manager.init_app(app)
print("[OK] ERPODBCManager初始化成功")
# 测试execute_query
results = manager.execute_query("SELECT 1 as test, 'hello' as msg")
print(f"[OK] execute_query成功: {results}")
if len(results) == 1 and results[0]['test'] == 1:
print("[OK] 查询结果正确")
return True
except Exception as e:
print(f"[FAIL] 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_real_erp_query():
"""测试4: 真实ERP查询"""
print("\n" + "=" * 60)
print("测试4: 真实ERP查询")
print("=" * 60)
try:
from iti.applications.service.erp import ERPODBCManager
manager = ERPODBCManager()
class MockApp:
config = {
"ERP_ODBC_CONFIG": {
"dsn": "YHC-test",
"pool_size": 3,
}
}
app = MockApp()
manager.init_app(app)
# 查询工单表限制1条
sql = """
SELECT TOP 1
OrderNumber
FROM monitor.ManufacturingOrder
"""
results = manager.execute_query(sql)
if results:
print(f"[OK] 查询到 {len(results)} 条工单")
print(f"[OK] 示例数据: {results[0]}")
else:
print("[INFO] 查询结果为空(可能表中没有数据)")
return True
except Exception as e:
print(f"[FAIL] 测试失败: {e}")
print("[INFO] 如果是表不存在的错误,说明连接正常,只是表名不对")
import traceback
traceback.print_exc()
return False
def test_concurrent_queries():
"""测试5: 并发查询"""
print("\n" + "=" * 60)
print("测试5: 并发查询5个线程")
print("=" * 60)
try:
from iti.applications.service.erp import ERPODBCManager
manager = ERPODBCManager()
class MockApp:
config = {
"ERP_ODBC_CONFIG": {
"dsn": "YHC-test",
"pool_size": 3,
"max_overflow": 5,
}
}
app = MockApp()
manager.init_app(app)
results = []
errors = []
def query_task(thread_id):
try:
result = manager.execute_query(f"SELECT {thread_id} as thread_id, 'test' as msg")
results.append((thread_id, result))
print(f" [线程{thread_id}] 查询成功")
except Exception as e:
errors.append((thread_id, str(e)))
print(f" [线程{thread_id}] 查询失败: {e}")
# 创建5个线程并发查询
threads = []
for i in range(5):
t = Thread(target=query_task, args=(i+1,))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print(f"[OK] 并发查询完成: 成功 {len(results)} 个, 失败 {len(errors)}")
if len(errors) == 0:
print("[OK] 所有并发查询都成功")
return True
else:
print(f"[WARN] 有 {len(errors)} 个查询失败")
return False
except Exception as e:
print(f"[FAIL] 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_performance():
"""测试6: 性能对比"""
print("\n" + "=" * 60)
print("测试6: 性能对比(连接池 vs 每次新建连接)")
print("=" * 60)
try:
import pyodbc
from iti.applications.service.erp import PyODBCConnectionPool
# 测试1: 不使用连接池(每次新建连接)
print("\n[测试] 不使用连接池10次查询...")
start = time.time()
for i in range(10):
conn = pyodbc.connect('DSN=YHC-test')
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()
cursor.close()
conn.close()
time_without_pool = time.time() - start
print(f" 耗时: {time_without_pool:.3f}")
# 测试2: 使用连接池
print("\n[测试] 使用连接池10次查询...")
pool = PyODBCConnectionPool(dsn="YHC-test", pool_size=3)
start = time.time()
for i in range(10):
with pool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()
cursor.close()
time_with_pool = time.time() - start
print(f" 耗时: {time_with_pool:.3f}")
# 性能提升
improvement = time_without_pool / time_with_pool
print(f"\n[结果] 性能提升: {improvement:.1f}")
if improvement > 2:
print("[OK] 连接池性能提升显著")
return True
else:
print("[INFO] 连接池性能提升不明显(可能查询太简单)")
return True
except Exception as e:
print(f"[FAIL] 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""运行所有测试"""
print("=" * 60)
print("ODBC连接池测试")
print("=" * 60)
print("\n前提条件:")
print("1. 已配置ODBC数据源: YHC-test")
print("2. 数据源可以正常连接")
tests = [
("基本连接", test_basic_connection),
("连接复用", test_connection_reuse),
("ERPODBCManager", test_erp_odbc_manager),
("真实ERP查询", test_real_erp_query),
("并发查询", test_concurrent_queries),
("性能对比", test_performance),
]
results = []
for name, test_func in tests:
try:
result = test_func()
results.append((name, result))
except Exception as e:
print(f"\n[ERROR] 测试 '{name}' 异常: {e}")
results.append((name, False))
# 输出结果
print("\n" + "=" * 60)
print("测试结果汇总")
print("=" * 60)
for name, result in results:
status = "[OK]" if result else "[FAIL]"
print(f"{status} {name}")
passed = sum(1 for _, r in results if r)
total = len(results)
print("\n" + "=" * 60)
print(f"总计: {passed}/{total} 通过")
print("=" * 60)
return passed == total
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)