""" ERP API管理器 提供基于requests的多用户Session管理和业务接口封装 """ import requests from threading import RLock from datetime import datetime, timedelta from typing import Optional, Dict, Any, List import logging logger = logging.getLogger(__name__) class ERPAPISession: """单个用户的API Session""" def __init__(self, username: str, password: str, config: dict): self.username = username self.password = password self.config = config self.base_url = config["base_url"].rstrip("/") self.language_code = config.get("language_code", "zh") self.company_number = config.get("company_number", "001_1.1") self.verify_ssl = config.get("verify_ssl", False) self.timeout = config.get("timeout", 30) self.session = requests.Session() self.header_session_id: Optional[str] = None self.cookie_session_id: Optional[str] = None self.session_expires_at: Optional[datetime] = None self.lock = RLock() if not self.verify_ssl: requests.packages.urllib3.disable_warnings() def login(self): """登录ERP""" with self.lock: url = f"{self.base_url}/{self.language_code}/{self.company_number}/login" payload = { "Username": self.username, "Password": self.password, "ForceRelogin": True, } try: resp = self.session.post(url, json=payload, timeout=self.timeout, verify=self.verify_ssl) if resp.status_code != 200: raise Exception(f"登录失败: {resp.text}") self.header_session_id = resp.headers.get("X-Monitor-SessionId") self.cookie_session_id = resp.cookies.get("SessionId") if not self.header_session_id and not self.cookie_session_id: raise Exception("未获取到SessionId") if self.cookie_session_id: self.session.cookies.set("SessionId", self.cookie_session_id) # Session有效期2小时 self.session_expires_at = datetime.now() + timedelta(hours=2) logger.info(f"ERP登录成功: {self.username}") except Exception as e: logger.error(f"ERP登录失败: {e}") raise def _ensure_session(self): """确保Session有效""" with self.lock: # 检查是否需要登录 if not self.header_session_id and not self.cookie_session_id: self.login() return # 检查是否即将过期(提前5分钟刷新) if self.session_expires_at: time_until_expire = (self.session_expires_at - datetime.now()).total_seconds() if time_until_expire < 300: logger.info(f"Session即将过期,重新登录: {self.username}") self.login() def request(self, method: str, path: str, **kwargs) -> Any: """发送HTTP请求(自动处理Session)""" url = f"{self.base_url}{path}" for attempt in range(2): # 最多重试1次 self._ensure_session() headers = kwargs.pop("headers", {}) if self.header_session_id: headers["X-Monitor-SessionId"] = self.header_session_id try: resp = self.session.request( method, url, headers=headers, timeout=self.timeout, verify=self.verify_ssl, **kwargs ) # Session失效,重新登录 if resp.status_code in (401, 403): logger.warning(f"Session失效,重新登录: {self.username}") with self.lock: self.header_session_id = None self.cookie_session_id = None continue if resp.status_code >= 400: raise Exception(f"API请求失败: {resp.status_code} - {resp.text}") return resp.json() except requests.exceptions.Timeout: raise Exception(f"请求超时: {url}") except requests.exceptions.RequestException as e: raise Exception(f"请求失败: {e}") raise Exception("Session刷新后依然无效") class ERPAPIManager: """ERP API管理器 - 支持多用户Session管理 + 封装业务接口""" _instance = None _sessions: Dict[str, ERPAPISession] = {} # username -> session _config = None _lock = RLock() def __new__(cls): """单例模式""" if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def init_app(self, app): """初始化(在app启动时调用)""" self._config = app.config.get("ERP_API_CONFIG", {}) logger.info("ERP API管理器初始化成功") def _get_session(self, username: str, password: str) -> ERPAPISession: """ 获取或创建用户Session(内部方法) Args: username: ERP用户名 password: ERP密码 Returns: 用户的Session对象 """ with self._lock: # 如果已存在,直接返回 if username in self._sessions: return self._sessions[username] # 创建新Session session = ERPAPISession(username, password, self._config) self._sessions[username] = session logger.info(f"创建新的ERP Session: {username}") return session def _request( self, method: str, path: str, username: Optional[str] = None, password: Optional[str] = None, **kwargs ) -> Any: """ 发送API请求(内部方法) Args: method: HTTP方法 path: 请求路径 username: ERP用户名(可选,默认使用配置中的) password: ERP密码(可选,默认使用配置中的) **kwargs: 其他requests参数 Returns: 响应JSON数据 """ # 使用提供的用户名密码,或使用配置中的默认值 username = username or self._config.get("username") password = password or self._config.get("password") if not username or not password: raise Exception("缺少ERP用户名或密码") # 获取Session并发送请求 session = self._get_session(username, password) return session.request(method, path, **kwargs) # ==================== 业务接口方法 ==================== def report_manufacturing_order_operation( self, report_number: str, reported_quantity: float, username: Optional[str] = None, password: Optional[str] = None ) -> Dict[str, Any]: """ 工单工序报工 Args: report_number: 报告号 reported_quantity: 报工数量 username: ERP用户名(可选) password: ERP密码(可选) Returns: 报工结果 """ path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Manufacturing/Reporting/ReportManufacturingOrderOperation" payload = { "ReportNumber": report_number, "ReportedQuantity": reported_quantity, } logger.info(f"ERP报工: ReportNumber={report_number}, Quantity={reported_quantity}, User={username}") result = self._request("POST", path, json=payload, username=username, password=password) logger.info(f"ERP报工成功: {result}") return result def report_material( self, manufacturing_order_operation_id: str, part_id: str, reported_quantity: Optional[float] = None, username: Optional[str] = None, password: Optional[str] = None ) -> Dict[str, Any]: """ 物料报工 Args: manufacturing_order_operation_id: 工序ID part_id: 物料ID reported_quantity: 报工数量(可选,不填则报告全部计划数量) username: ERP用户名(可选) password: ERP密码(可选) Returns: 报工结果 """ path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Manufacturing/Reporting/ReportManufacturingOrderMaterial" payload = { "ManufacturingOrderOperationId": manufacturing_order_operation_id, "PartId": part_id, } if reported_quantity is not None: payload["ReportedQuantity"] = reported_quantity logger.info(f"ERP物料报工: OperationId={manufacturing_order_operation_id}, PartId={part_id}") result = self._request("POST", path, json=payload, username=username, password=password) logger.info(f"ERP物料报工成功: {result}") return result def get_parts( self, filters: Optional[Dict[str, Any]] = None, username: Optional[str] = None, password: Optional[str] = None ) -> List[Dict[str, Any]]: """ 获取零件列表 Args: filters: OData查询参数,例如: { "$filter": "PartNumber eq 'ABC123'", "$select": "Id,PartNumber,Description", "$top": 10 } username: ERP用户名(可选) password: ERP密码(可选) Returns: 零件列表 """ path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Inventory/Parts" logger.info(f"ERP查询零件: filters={filters}") result = self._request("GET", path, params=filters, username=username, password=password) logger.info(f"ERP查询零件成功: 返回 {len(result)} 条") return result def get_manufacturing_order_operations( self, manufacturing_order_id: str, username: Optional[str] = None, password: Optional[str] = None ) -> List[Dict[str, Any]]: """ 获取工单工序列表 Args: manufacturing_order_id: 工单ID username: ERP用户名(可选) password: ERP密码(可选) Returns: 工序列表 """ path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Manufacturing/ManufacturingOrderOperations" filters = { "$filter": f"ManufacturingOrderId eq '{manufacturing_order_id}'" } logger.info(f"ERP查询工序: ManufacturingOrderId={manufacturing_order_id}") result = self._request("GET", path, params=filters, username=username, password=password) logger.info(f"ERP查询工序成功: 返回 {len(result)} 条") return result def create_comment( self, entity_type: int, entity_id: str, text: str, is_internal: bool = True, username: Optional[str] = None, password: Optional[str] = None ) -> Dict[str, Any]: """ 创建评论 Args: entity_type: 实体类型(ManufacturingOrder:12, ManufacturingOrderOperation:7) entity_id: 实体ID text: 评论内容 is_internal: 是否内部评论 username: ERP用户名(可选) password: ERP密码(可选) Returns: 创建结果 """ path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Common/Comments/Create" payload = { "EntityType": entity_type, "EntityId": entity_id, "Text": text, "IsInternal": is_internal, } logger.info(f"ERP创建评论: EntityType={entity_type}, EntityId={entity_id}") result = self._request("POST", path, json=payload, username=username, password=password) logger.info(f"ERP创建评论成功: {result}") return result # ==================== 管理方法 ==================== def clear_session(self, username: str): """清除指定用户的Session""" with self._lock: if username in self._sessions: del self._sessions[username] logger.info(f"清除ERP Session: {username}") def clear_all_sessions(self): """清除所有Session""" with self._lock: self._sessions.clear() logger.info("清除所有ERP Session") # 全局实例 erp_api = ERPAPIManager()