You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

392 lines
13 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
ERP API管理器
提供基于requests的多用户Session管理和业务接口封装
"""
import requests
from threading import RLock
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
import logging
logger = logging.getLogger(__name__)
class ERPAPISession:
"""单个用户的API Session"""
def __init__(self, username: str, password: str, config: dict):
self.username = username
self.password = password
self.config = config
self.base_url = config["base_url"].rstrip("/")
self.language_code = config.get("language_code", "zh")
self.company_number = config.get("company_number", "001_1.1")
self.verify_ssl = config.get("verify_ssl", False)
self.timeout = config.get("timeout", 30)
self.session = requests.Session()
self.header_session_id: Optional[str] = None
self.cookie_session_id: Optional[str] = None
self.session_expires_at: Optional[datetime] = None
self.lock = RLock()
if not self.verify_ssl:
requests.packages.urllib3.disable_warnings()
def login(self):
"""登录ERP"""
with self.lock:
url = f"{self.base_url}/{self.language_code}/{self.company_number}/login"
payload = {
"Username": self.username,
"Password": self.password,
"ForceRelogin": True,
}
try:
resp = self.session.post(url, json=payload, timeout=self.timeout, verify=self.verify_ssl)
if resp.status_code != 200:
raise Exception(f"登录失败: {resp.text}")
self.header_session_id = resp.headers.get("X-Monitor-SessionId")
self.cookie_session_id = resp.cookies.get("SessionId")
if not self.header_session_id and not self.cookie_session_id:
raise Exception("未获取到SessionId")
if self.cookie_session_id:
self.session.cookies.set("SessionId", self.cookie_session_id)
# Session有效期2小时
self.session_expires_at = datetime.now() + timedelta(hours=2)
logger.info(f"ERP登录成功: {self.username}")
except Exception as e:
logger.error(f"ERP登录失败: {e}")
raise
def _ensure_session(self):
"""确保Session有效"""
with self.lock:
# 检查是否需要登录
if not self.header_session_id and not self.cookie_session_id:
self.login()
return
# 检查是否即将过期提前5分钟刷新
if self.session_expires_at:
time_until_expire = (self.session_expires_at - datetime.now()).total_seconds()
if time_until_expire < 300:
logger.info(f"Session即将过期重新登录: {self.username}")
self.login()
def request(self, method: str, path: str, **kwargs) -> Any:
"""发送HTTP请求自动处理Session"""
url = f"{self.base_url}{path}"
for attempt in range(2): # 最多重试1次
self._ensure_session()
headers = kwargs.pop("headers", {})
if self.header_session_id:
headers["X-Monitor-SessionId"] = self.header_session_id
try:
resp = self.session.request(
method, url, headers=headers,
timeout=self.timeout, verify=self.verify_ssl, **kwargs
)
# Session失效重新登录
if resp.status_code in (401, 403):
logger.warning(f"Session失效重新登录: {self.username}")
with self.lock:
self.header_session_id = None
self.cookie_session_id = None
continue
if resp.status_code >= 400:
raise Exception(f"API请求失败: {resp.status_code} - {resp.text}")
return resp.json()
except requests.exceptions.Timeout:
raise Exception(f"请求超时: {url}")
except requests.exceptions.RequestException as e:
raise Exception(f"请求失败: {e}")
raise Exception("Session刷新后依然无效")
class ERPAPIManager:
"""ERP API管理器 - 支持多用户Session管理 + 封装业务接口"""
_instance = None
_sessions: Dict[str, ERPAPISession] = {} # username -> session
_config = None
_lock = RLock()
def __new__(cls):
"""单例模式"""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def init_app(self, app):
"""初始化在app启动时调用"""
self._config = app.config.get("ERP_API_CONFIG", {})
logger.info("ERP API管理器初始化成功")
def _get_session(self, username: str, password: str) -> ERPAPISession:
"""
获取或创建用户Session内部方法
Args:
username: ERP用户名
password: ERP密码
Returns:
用户的Session对象
"""
with self._lock:
# 如果已存在,直接返回
if username in self._sessions:
return self._sessions[username]
# 创建新Session
session = ERPAPISession(username, password, self._config)
self._sessions[username] = session
logger.info(f"创建新的ERP Session: {username}")
return session
def _request(
self,
method: str,
path: str,
username: Optional[str] = None,
password: Optional[str] = None,
**kwargs
) -> Any:
"""
发送API请求内部方法
Args:
method: HTTP方法
path: 请求路径
username: ERP用户名可选默认使用配置中的
password: ERP密码可选默认使用配置中的
**kwargs: 其他requests参数
Returns:
响应JSON数据
"""
# 使用提供的用户名密码,或使用配置中的默认值
username = username or self._config.get("username")
password = password or self._config.get("password")
if not username or not password:
raise Exception("缺少ERP用户名或密码")
# 获取Session并发送请求
session = self._get_session(username, password)
return session.request(method, path, **kwargs)
# ==================== 业务接口方法 ====================
def report_manufacturing_order_operation(
self,
report_number: str,
reported_quantity: float,
username: Optional[str] = None,
password: Optional[str] = None
) -> Dict[str, Any]:
"""
工单工序报工
Args:
report_number: 报告号
reported_quantity: 报工数量
username: ERP用户名可选
password: ERP密码可选
Returns:
报工结果
"""
path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Manufacturing/Reporting/ReportManufacturingOrderOperation"
payload = {
"ReportNumber": report_number,
"ReportedQuantity": reported_quantity,
}
logger.info(f"ERP报工: ReportNumber={report_number}, Quantity={reported_quantity}, User={username}")
result = self._request("POST", path, json=payload, username=username, password=password)
logger.info(f"ERP报工成功: {result}")
return result
def report_material(
self,
manufacturing_order_operation_id: str,
part_id: str,
reported_quantity: Optional[float] = None,
username: Optional[str] = None,
password: Optional[str] = None
) -> Dict[str, Any]:
"""
物料报工
Args:
manufacturing_order_operation_id: 工序ID
part_id: 物料ID
reported_quantity: 报工数量(可选,不填则报告全部计划数量)
username: ERP用户名可选
password: ERP密码可选
Returns:
报工结果
"""
path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Manufacturing/Reporting/ReportManufacturingOrderMaterial"
payload = {
"ManufacturingOrderOperationId": manufacturing_order_operation_id,
"PartId": part_id,
}
if reported_quantity is not None:
payload["ReportedQuantity"] = reported_quantity
logger.info(f"ERP物料报工: OperationId={manufacturing_order_operation_id}, PartId={part_id}")
result = self._request("POST", path, json=payload, username=username, password=password)
logger.info(f"ERP物料报工成功: {result}")
return result
def get_parts(
self,
filters: Optional[Dict[str, Any]] = None,
username: Optional[str] = None,
password: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
获取零件列表
Args:
filters: OData查询参数例如
{
"$filter": "PartNumber eq 'ABC123'",
"$select": "Id,PartNumber,Description",
"$top": 10
}
username: ERP用户名可选
password: ERP密码可选
Returns:
零件列表
"""
path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Inventory/Parts"
logger.info(f"ERP查询零件: filters={filters}")
result = self._request("GET", path, params=filters, username=username, password=password)
logger.info(f"ERP查询零件成功: 返回 {len(result)}")
return result
def get_manufacturing_order_operations(
self,
manufacturing_order_id: str,
username: Optional[str] = None,
password: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
获取工单工序列表
Args:
manufacturing_order_id: 工单ID
username: ERP用户名可选
password: ERP密码可选
Returns:
工序列表
"""
path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Manufacturing/ManufacturingOrderOperations"
filters = {
"$filter": f"ManufacturingOrderId eq '{manufacturing_order_id}'"
}
logger.info(f"ERP查询工序: ManufacturingOrderId={manufacturing_order_id}")
result = self._request("GET", path, params=filters, username=username, password=password)
logger.info(f"ERP查询工序成功: 返回 {len(result)}")
return result
def create_comment(
self,
entity_type: int,
entity_id: str,
text: str,
is_internal: bool = True,
username: Optional[str] = None,
password: Optional[str] = None
) -> Dict[str, Any]:
"""
创建评论
Args:
entity_type: 实体类型ManufacturingOrder:12, ManufacturingOrderOperation:7
entity_id: 实体ID
text: 评论内容
is_internal: 是否内部评论
username: ERP用户名可选
password: ERP密码可选
Returns:
创建结果
"""
path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Common/Comments/Create"
payload = {
"EntityType": entity_type,
"EntityId": entity_id,
"Text": text,
"IsInternal": is_internal,
}
logger.info(f"ERP创建评论: EntityType={entity_type}, EntityId={entity_id}")
result = self._request("POST", path, json=payload, username=username, password=password)
logger.info(f"ERP创建评论成功: {result}")
return result
# ==================== 管理方法 ====================
def clear_session(self, username: str):
"""清除指定用户的Session"""
with self._lock:
if username in self._sessions:
del self._sessions[username]
logger.info(f"清除ERP Session: {username}")
def clear_all_sessions(self):
"""清除所有Session"""
with self._lock:
self._sessions.clear()
logger.info("清除所有ERP Session")
# 全局实例
erp_api = ERPAPIManager()