|
|
|
|
@ -1,391 +0,0 @@
|
|
|
|
|
"""
|
|
|
|
|
ERP API管理器
|
|
|
|
|
|
|
|
|
|
提供基于requests的多用户Session管理和业务接口封装
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import requests
|
|
|
|
|
from threading import RLock
|
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
|
import logging
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ERPAPISession:
|
|
|
|
|
"""单个用户的API Session"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, username: str, password: str, config: dict):
|
|
|
|
|
self.username = username
|
|
|
|
|
self.password = password
|
|
|
|
|
self.config = config
|
|
|
|
|
|
|
|
|
|
self.base_url = config["base_url"].rstrip("/")
|
|
|
|
|
self.language_code = config.get("language_code", "zh")
|
|
|
|
|
self.company_number = config.get("company_number", "001_1.1")
|
|
|
|
|
self.verify_ssl = config.get("verify_ssl", False)
|
|
|
|
|
self.timeout = config.get("timeout", 30)
|
|
|
|
|
|
|
|
|
|
self.session = requests.Session()
|
|
|
|
|
self.header_session_id: Optional[str] = None
|
|
|
|
|
self.cookie_session_id: Optional[str] = None
|
|
|
|
|
self.session_expires_at: Optional[datetime] = None
|
|
|
|
|
self.lock = RLock()
|
|
|
|
|
|
|
|
|
|
if not self.verify_ssl:
|
|
|
|
|
requests.packages.urllib3.disable_warnings()
|
|
|
|
|
|
|
|
|
|
def login(self):
|
|
|
|
|
"""登录ERP"""
|
|
|
|
|
with self.lock:
|
|
|
|
|
url = f"{self.base_url}/{self.language_code}/{self.company_number}/login"
|
|
|
|
|
payload = {
|
|
|
|
|
"Username": self.username,
|
|
|
|
|
"Password": self.password,
|
|
|
|
|
"ForceRelogin": True,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
resp = self.session.post(url, json=payload, timeout=self.timeout, verify=self.verify_ssl)
|
|
|
|
|
|
|
|
|
|
if resp.status_code != 200:
|
|
|
|
|
raise Exception(f"登录失败: {resp.text}")
|
|
|
|
|
|
|
|
|
|
self.header_session_id = resp.headers.get("X-Monitor-SessionId")
|
|
|
|
|
self.cookie_session_id = resp.cookies.get("SessionId")
|
|
|
|
|
|
|
|
|
|
if not self.header_session_id and not self.cookie_session_id:
|
|
|
|
|
raise Exception("未获取到SessionId")
|
|
|
|
|
|
|
|
|
|
if self.cookie_session_id:
|
|
|
|
|
self.session.cookies.set("SessionId", self.cookie_session_id)
|
|
|
|
|
|
|
|
|
|
# Session有效期2小时
|
|
|
|
|
self.session_expires_at = datetime.now() + timedelta(hours=2)
|
|
|
|
|
|
|
|
|
|
logger.info(f"ERP登录成功: {self.username}")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"ERP登录失败: {e}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
def _ensure_session(self):
|
|
|
|
|
"""确保Session有效"""
|
|
|
|
|
with self.lock:
|
|
|
|
|
# 检查是否需要登录
|
|
|
|
|
if not self.header_session_id and not self.cookie_session_id:
|
|
|
|
|
self.login()
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
# 检查是否即将过期(提前5分钟刷新)
|
|
|
|
|
if self.session_expires_at:
|
|
|
|
|
time_until_expire = (self.session_expires_at - datetime.now()).total_seconds()
|
|
|
|
|
if time_until_expire < 300:
|
|
|
|
|
logger.info(f"Session即将过期,重新登录: {self.username}")
|
|
|
|
|
self.login()
|
|
|
|
|
|
|
|
|
|
def request(self, method: str, path: str, **kwargs) -> Any:
|
|
|
|
|
"""发送HTTP请求(自动处理Session)"""
|
|
|
|
|
url = f"{self.base_url}{path}"
|
|
|
|
|
|
|
|
|
|
for attempt in range(2): # 最多重试1次
|
|
|
|
|
self._ensure_session()
|
|
|
|
|
|
|
|
|
|
headers = kwargs.pop("headers", {})
|
|
|
|
|
if self.header_session_id:
|
|
|
|
|
headers["X-Monitor-SessionId"] = self.header_session_id
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
resp = self.session.request(
|
|
|
|
|
method, url, headers=headers,
|
|
|
|
|
timeout=self.timeout, verify=self.verify_ssl, **kwargs
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Session失效,重新登录
|
|
|
|
|
if resp.status_code in (401, 403):
|
|
|
|
|
logger.warning(f"Session失效,重新登录: {self.username}")
|
|
|
|
|
with self.lock:
|
|
|
|
|
self.header_session_id = None
|
|
|
|
|
self.cookie_session_id = None
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if resp.status_code >= 400:
|
|
|
|
|
raise Exception(f"API请求失败: {resp.status_code} - {resp.text}")
|
|
|
|
|
|
|
|
|
|
return resp.json()
|
|
|
|
|
|
|
|
|
|
except requests.exceptions.Timeout:
|
|
|
|
|
raise Exception(f"请求超时: {url}")
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
raise Exception(f"请求失败: {e}")
|
|
|
|
|
|
|
|
|
|
raise Exception("Session刷新后依然无效")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ERPAPIManager:
|
|
|
|
|
"""ERP API管理器 - 支持多用户Session管理 + 封装业务接口"""
|
|
|
|
|
|
|
|
|
|
_instance = None
|
|
|
|
|
_sessions: Dict[str, ERPAPISession] = {} # username -> session
|
|
|
|
|
_config = None
|
|
|
|
|
_lock = RLock()
|
|
|
|
|
|
|
|
|
|
def __new__(cls):
|
|
|
|
|
"""单例模式"""
|
|
|
|
|
if cls._instance is None:
|
|
|
|
|
cls._instance = super().__new__(cls)
|
|
|
|
|
return cls._instance
|
|
|
|
|
|
|
|
|
|
def init_app(self, app):
|
|
|
|
|
"""初始化(在app启动时调用)"""
|
|
|
|
|
self._config = app.config.get("ERP_API_CONFIG", {})
|
|
|
|
|
logger.info("ERP API管理器初始化成功")
|
|
|
|
|
|
|
|
|
|
def _get_session(self, username: str, password: str) -> ERPAPISession:
|
|
|
|
|
"""
|
|
|
|
|
获取或创建用户Session(内部方法)
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
username: ERP用户名
|
|
|
|
|
password: ERP密码
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
用户的Session对象
|
|
|
|
|
"""
|
|
|
|
|
with self._lock:
|
|
|
|
|
# 如果已存在,直接返回
|
|
|
|
|
if username in self._sessions:
|
|
|
|
|
return self._sessions[username]
|
|
|
|
|
|
|
|
|
|
# 创建新Session
|
|
|
|
|
session = ERPAPISession(username, password, self._config)
|
|
|
|
|
self._sessions[username] = session
|
|
|
|
|
|
|
|
|
|
logger.info(f"创建新的ERP Session: {username}")
|
|
|
|
|
return session
|
|
|
|
|
|
|
|
|
|
def _request(
|
|
|
|
|
self,
|
|
|
|
|
method: str,
|
|
|
|
|
path: str,
|
|
|
|
|
username: Optional[str] = None,
|
|
|
|
|
password: Optional[str] = None,
|
|
|
|
|
**kwargs
|
|
|
|
|
) -> Any:
|
|
|
|
|
"""
|
|
|
|
|
发送API请求(内部方法)
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
method: HTTP方法
|
|
|
|
|
path: 请求路径
|
|
|
|
|
username: ERP用户名(可选,默认使用配置中的)
|
|
|
|
|
password: ERP密码(可选,默认使用配置中的)
|
|
|
|
|
**kwargs: 其他requests参数
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
响应JSON数据
|
|
|
|
|
"""
|
|
|
|
|
# 使用提供的用户名密码,或使用配置中的默认值
|
|
|
|
|
username = username or self._config.get("username")
|
|
|
|
|
password = password or self._config.get("password")
|
|
|
|
|
|
|
|
|
|
if not username or not password:
|
|
|
|
|
raise Exception("缺少ERP用户名或密码")
|
|
|
|
|
|
|
|
|
|
# 获取Session并发送请求
|
|
|
|
|
session = self._get_session(username, password)
|
|
|
|
|
return session.request(method, path, **kwargs)
|
|
|
|
|
|
|
|
|
|
# ==================== 业务接口方法 ====================
|
|
|
|
|
|
|
|
|
|
def report_manufacturing_order_operation(
|
|
|
|
|
self,
|
|
|
|
|
report_number: str,
|
|
|
|
|
reported_quantity: float,
|
|
|
|
|
username: Optional[str] = None,
|
|
|
|
|
password: Optional[str] = None
|
|
|
|
|
) -> Dict[str, Any]:
|
|
|
|
|
"""
|
|
|
|
|
工单工序报工
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
report_number: 报告号
|
|
|
|
|
reported_quantity: 报工数量
|
|
|
|
|
username: ERP用户名(可选)
|
|
|
|
|
password: ERP密码(可选)
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
报工结果
|
|
|
|
|
"""
|
|
|
|
|
path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Manufacturing/Reporting/ReportManufacturingOrderOperation"
|
|
|
|
|
|
|
|
|
|
payload = {
|
|
|
|
|
"ReportNumber": report_number,
|
|
|
|
|
"ReportedQuantity": reported_quantity,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.info(f"ERP报工: ReportNumber={report_number}, Quantity={reported_quantity}, User={username}")
|
|
|
|
|
|
|
|
|
|
result = self._request("POST", path, json=payload, username=username, password=password)
|
|
|
|
|
|
|
|
|
|
logger.info(f"ERP报工成功: {result}")
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
def report_material(
|
|
|
|
|
self,
|
|
|
|
|
manufacturing_order_operation_id: str,
|
|
|
|
|
part_id: str,
|
|
|
|
|
reported_quantity: Optional[float] = None,
|
|
|
|
|
username: Optional[str] = None,
|
|
|
|
|
password: Optional[str] = None
|
|
|
|
|
) -> Dict[str, Any]:
|
|
|
|
|
"""
|
|
|
|
|
物料报工
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
manufacturing_order_operation_id: 工序ID
|
|
|
|
|
part_id: 物料ID
|
|
|
|
|
reported_quantity: 报工数量(可选,不填则报告全部计划数量)
|
|
|
|
|
username: ERP用户名(可选)
|
|
|
|
|
password: ERP密码(可选)
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
报工结果
|
|
|
|
|
"""
|
|
|
|
|
path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Manufacturing/Reporting/ReportManufacturingOrderMaterial"
|
|
|
|
|
|
|
|
|
|
payload = {
|
|
|
|
|
"ManufacturingOrderOperationId": manufacturing_order_operation_id,
|
|
|
|
|
"PartId": part_id,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if reported_quantity is not None:
|
|
|
|
|
payload["ReportedQuantity"] = reported_quantity
|
|
|
|
|
|
|
|
|
|
logger.info(f"ERP物料报工: OperationId={manufacturing_order_operation_id}, PartId={part_id}")
|
|
|
|
|
|
|
|
|
|
result = self._request("POST", path, json=payload, username=username, password=password)
|
|
|
|
|
|
|
|
|
|
logger.info(f"ERP物料报工成功: {result}")
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
def get_parts(
|
|
|
|
|
self,
|
|
|
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
|
|
|
username: Optional[str] = None,
|
|
|
|
|
password: Optional[str] = None
|
|
|
|
|
) -> List[Dict[str, Any]]:
|
|
|
|
|
"""
|
|
|
|
|
获取零件列表
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
filters: OData查询参数,例如:
|
|
|
|
|
{
|
|
|
|
|
"$filter": "PartNumber eq 'ABC123'",
|
|
|
|
|
"$select": "Id,PartNumber,Description",
|
|
|
|
|
"$top": 10
|
|
|
|
|
}
|
|
|
|
|
username: ERP用户名(可选)
|
|
|
|
|
password: ERP密码(可选)
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
零件列表
|
|
|
|
|
"""
|
|
|
|
|
path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Inventory/Parts"
|
|
|
|
|
|
|
|
|
|
logger.info(f"ERP查询零件: filters={filters}")
|
|
|
|
|
|
|
|
|
|
result = self._request("GET", path, params=filters, username=username, password=password)
|
|
|
|
|
|
|
|
|
|
logger.info(f"ERP查询零件成功: 返回 {len(result)} 条")
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
def get_manufacturing_order_operations(
|
|
|
|
|
self,
|
|
|
|
|
manufacturing_order_id: str,
|
|
|
|
|
username: Optional[str] = None,
|
|
|
|
|
password: Optional[str] = None
|
|
|
|
|
) -> List[Dict[str, Any]]:
|
|
|
|
|
"""
|
|
|
|
|
获取工单工序列表
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
manufacturing_order_id: 工单ID
|
|
|
|
|
username: ERP用户名(可选)
|
|
|
|
|
password: ERP密码(可选)
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
工序列表
|
|
|
|
|
"""
|
|
|
|
|
path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Manufacturing/ManufacturingOrderOperations"
|
|
|
|
|
|
|
|
|
|
filters = {
|
|
|
|
|
"$filter": f"ManufacturingOrderId eq '{manufacturing_order_id}'"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.info(f"ERP查询工序: ManufacturingOrderId={manufacturing_order_id}")
|
|
|
|
|
|
|
|
|
|
result = self._request("GET", path, params=filters, username=username, password=password)
|
|
|
|
|
|
|
|
|
|
logger.info(f"ERP查询工序成功: 返回 {len(result)} 条")
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
def create_comment(
|
|
|
|
|
self,
|
|
|
|
|
entity_type: int,
|
|
|
|
|
entity_id: str,
|
|
|
|
|
text: str,
|
|
|
|
|
is_internal: bool = True,
|
|
|
|
|
username: Optional[str] = None,
|
|
|
|
|
password: Optional[str] = None
|
|
|
|
|
) -> Dict[str, Any]:
|
|
|
|
|
"""
|
|
|
|
|
创建评论
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
entity_type: 实体类型(ManufacturingOrder:12, ManufacturingOrderOperation:7)
|
|
|
|
|
entity_id: 实体ID
|
|
|
|
|
text: 评论内容
|
|
|
|
|
is_internal: 是否内部评论
|
|
|
|
|
username: ERP用户名(可选)
|
|
|
|
|
password: ERP密码(可选)
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
创建结果
|
|
|
|
|
"""
|
|
|
|
|
path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Common/Comments/Create"
|
|
|
|
|
|
|
|
|
|
payload = {
|
|
|
|
|
"EntityType": entity_type,
|
|
|
|
|
"EntityId": entity_id,
|
|
|
|
|
"Text": text,
|
|
|
|
|
"IsInternal": is_internal,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.info(f"ERP创建评论: EntityType={entity_type}, EntityId={entity_id}")
|
|
|
|
|
|
|
|
|
|
result = self._request("POST", path, json=payload, username=username, password=password)
|
|
|
|
|
|
|
|
|
|
logger.info(f"ERP创建评论成功: {result}")
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
# ==================== 管理方法 ====================
|
|
|
|
|
|
|
|
|
|
def clear_session(self, username: str):
|
|
|
|
|
"""清除指定用户的Session"""
|
|
|
|
|
with self._lock:
|
|
|
|
|
if username in self._sessions:
|
|
|
|
|
del self._sessions[username]
|
|
|
|
|
logger.info(f"清除ERP Session: {username}")
|
|
|
|
|
|
|
|
|
|
def clear_all_sessions(self):
|
|
|
|
|
"""清除所有Session"""
|
|
|
|
|
with self._lock:
|
|
|
|
|
self._sessions.clear()
|
|
|
|
|
logger.info("清除所有ERP Session")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 全局实例
|
|
|
|
|
erp_api = ERPAPIManager()
|
|
|
|
|
|