diff --git a/docs/EXCHANGE.md b/docs/EXCHANGE.md index 8871dee..cbf1cdf 100644 --- a/docs/EXCHANGE.md +++ b/docs/EXCHANGE.md @@ -1,67 +1,121 @@ # 模板与导入导出 -iTi-Flask 提供统一的数据模板、导入和导出基础能力。 -框架提供模板源抽象、对象、存储、任务和文件处理。 -业务系统自己决定入口、字段语义、模板中心和回调处理。 - -## 设计边界 - -- 模板按业务实体建模,不按单表字段映射建模。 -- 导入模板和导出模板独立。 -- 模板支持编辑和版本管理。 -- 模板也支持上传 Excel 作为来源。 -- 执行时按任务显式选择已发布版本。 -- 导入和导出都走任务。 -- 没有模板时,框架按字段映射兜底。 -- 模板源可来自本地表、远程模板中心、业务自定义 provider 或纯映射输入。 -- `system` 不是能力开关。 -- 业务项目可以自建模板中心,也可以挂远程模板中心 RPC。 -- 只要实现 `ExchangeSource` 并注册进 `app.state.iti_exchange`,就能接入同一套计划解析和模板文件生成入口。 - -## 框架对象 - -- `ExchangeTemplate` -- `ExchangeTemplateSnapshot` -- `ExchangePlan` -- `ExchangeTemplateBinding` -- `ExchangeField` -- `ExchangePlaceholder` -- `ExchangeTemplateSource` -- `ExchangeTemplateSourceKind` -- `ExchangeTaskKind` -- `ExchangeSource` +iTi-Flask 提供模板中心、模板文件、导入导出任务和业务规格注册能力。 +具体业务字段、变量含义和执行逻辑由业务模块提供。 + +## 业务范围 + +一个导入导出能力由三段业务键定位: + +- `biz_domain`:业务域,例如 `system`、`mes`、`qos`。 +- `biz_obj`:业务对象,例如 `user`、`work_order`。 +- `operation`:操作,目前是 `import` 或 `export`。 -## 入口 +模板编码默认由这三段生成: -- `iti.exchange` -- `iti.exchange.service.ExchangeService` -- `iti.exchange.routes.router` -- `iti.exchange.module.create_exchange_module()` +```text +system.user.import +``` + +前端可以调用 `/exchange/templates/code` 生成,也可以创建模板时不传 `code`,后端自动生成。 + +## 业务规格 -## 配置 +模板变量不是运维动态维护的字典。 +变量由业务模块注册,模板中心只展示和保存版本快照。 ```python -class DevConfig(BaseDevConfig): - def __init__(self) -> None: - super().__init__() - self.exchange_enabled = True +from iti.exchange import ( + ExchangeBusinessSpec, + ExchangeOperation, + ExchangeScope, + ExchangeTemplateLayout, + ExchangeTaskResult, + ExchangeVariable, + register_exchange_spec, +) + + +def import_users(context): + return ExchangeTaskResult(success_count=1) + + +def register_tasks(self, app): + register_exchange_spec( + app, + ExchangeBusinessSpec( + scope=ExchangeScope("system", "user", ExchangeOperation.IMPORT), + name="用户导入", + description="导入系统用户", + layout=ExchangeTemplateLayout(title="用户导入", sheet_name="用户", header_row=2), + variables=( + ExchangeVariable(key="username", label="用户名", required=True, example="alice"), + ExchangeVariable(key="mobile", label="手机号"), + ), + ), + handler=import_users, + ) ``` -需要文件存储时复用 `file_storage`。 +前端维护模板时,先从 `/exchange/catalog` 聚合查询业务范围。 +选定范围后,页面展示该范围的变量、示例和使用方式。 + +## 模板 + +模板记录保存: -## 业务接法 +- `code` +- `name` +- `biz_domain` +- `biz_obj` +- `operation` +- `layout` +- `current_version` +- `status` -业务项目可直接注册 `create_exchange_module()`,也可以自己写模块,只复用 `ExchangeService`、`ExchangeSource`、`register_exchange_source()` 和任务注册接口。 +`layout` 只保存解析需要的标记,例如 `title_row`、`header_row`、`data_start_row`。 +样式属于 Excel 文件本身,不进入导入导出数据模型。 -业务通常要自己补: +版本记录保存: -- 模板字段和 placeholder 定义。 -- 模板发布流程。 -- 模板中心 RPC、本地表维护方式,或自定义 source 注册方式。 -- 导入回执和导出文件命名。 -- 任务执行器里的实际业务处理。 -- 菜单和页面入口。 +- 模板文件位置 +- 校验值 +- 布局快照 +- 变量快照 -模板中心可以由 `system` 提供,也可以由业务项目自建。框架只提供统一的计划解析、模板文件生成、Excel 读写和 source 接入层。 +变量快照用于保证历史版本可复现。 +业务模块改了变量定义,不会影响已发布版本。 + +## 执行 + +创建任务时传业务范围和可选模板版本。 +框架解析模板计划后创建 `exchange_tasks`。 +调用 `/exchange/tasks/{task_id}/run` 时,框架按业务范围找到注册的 handler,并把 `ExchangeTaskContext` 交给业务模块。 + +框架只管理任务状态、行结果和文件读写。 +业务模块负责真实导入、导出、校验、回执和业务事务。 + +## Source + +模板计划可来自: + +- 本地模板中心:默认。 +- 远程模板中心:`sourceKind=remote`。 +- 自定义 source:注册 `register_exchange_source()`。 +- 纯映射输入:显式 `sourceKind=mapping`。 + +## 主要对象 + +- `ExchangeBusinessSpec` +- `ExchangeScope` +- `ExchangeOperation` +- `ExchangeVariable` +- `ExchangeTemplateLayout` +- `ExchangeTemplatePlan` +- `ExchangeTemplateSnapshot` +- `ExchangeTaskContext` +- `ExchangeTaskResult` +- `ExchangeSource` -Excel 数据处理走 `pandas`。`openpyxl` 只处理模板结构和格式。 +Excel 数据处理走 `pandas`。 +模板文件生成和上传解析走 `openpyxl`。 diff --git a/docs/MODULES.md b/docs/MODULES.md index 2dffcbc..7883c56 100644 --- a/docs/MODULES.md +++ b/docs/MODULES.md @@ -80,7 +80,8 @@ def register_permissions(self, app): ## 模板与导入导出 框架内置的交换能力由 `iti.exchange.module.create_exchange_module()` 提供。 -业务模块可以直接复用它,也可以只复用 `ExchangeService`、`ExchangeSource`、`register_exchange_source()`、`register_exchange_task()` 和 `router`。 +业务模块通过 `register_exchange_spec()` 注册 `biz_domain`、`biz_obj`、`operation`、模板变量和 handler。 +模板中心可以聚合这些规格,前端据此展示业务范围和变量说明。 模板中心可以由 `system` 承载,也可以由业务模块自建。框架侧能力不依赖 `system` 是否存在。 ```python diff --git a/iti/exchange/__init__.py b/iti/exchange/__init__.py index fcfca9e..cda74c4 100644 --- a/iti/exchange/__init__.py +++ b/iti/exchange/__init__.py @@ -1,22 +1,25 @@ from .base import ( DataExchangeModule, - ExchangeField, - ExchangePlaceholder, - ExchangePlan, - ExchangeTemplate, - ExchangeTemplateBinding, - ExchangeTemplateKind, - ExchangeTemplateSource, - ExchangeTemplateSourceKind, + ExchangeBusinessSpec, + ExchangeOperation, + ExchangeScope, + ExchangeTaskContext, + ExchangeTaskHandler, + ExchangeTaskResult, + ExchangeTemplateLayout, + ExchangeTemplatePlan, ExchangeTemplateSnapshot, - ExchangeTaskKind, + ExchangeTemplateSourceKind, + ExchangeVariable, ) -from .plan import ExchangeMappingPlanInput +from .plan import ExchangePlanInput from .registry import ( ExchangeRegistry, get_exchange_registry, get_exchange_source_by_name, + register_exchange_handler, register_exchange_source, + register_exchange_spec, ) from .sources import ( ExchangeSource, @@ -29,25 +32,28 @@ from .tasks import register_exchange_task __all__ = [ "DataExchangeModule", - "ExchangeField", - "ExchangeMappingPlanInput", - "ExchangePlaceholder", - "ExchangePlan", + "ExchangeBusinessSpec", + "ExchangeOperation", + "ExchangePlanInput", "ExchangeRegistry", - "ExchangeTemplate", - "ExchangeTemplateBinding", - "ExchangeTemplateKind", - "ExchangeTemplateSource", - "ExchangeTemplateSourceKind", - "ExchangeTemplateSnapshot", - "ExchangeTaskKind", + "ExchangeScope", "ExchangeSource", + "ExchangeTaskContext", + "ExchangeTaskHandler", + "ExchangeTaskResult", + "ExchangeTemplateLayout", + "ExchangeTemplatePlan", + "ExchangeTemplateSnapshot", + "ExchangeTemplateSourceKind", + "ExchangeVariable", "LocalExchangeSource", + "MappingExchangeSource", + "RemoteExchangeSource", "get_exchange_registry", - "get_exchange_source_by_name", "get_exchange_source", - "MappingExchangeSource", + "get_exchange_source_by_name", + "register_exchange_handler", "register_exchange_source", + "register_exchange_spec", "register_exchange_task", - "RemoteExchangeSource", ] diff --git a/iti/exchange/base.py b/iti/exchange/base.py index 9b3bfee..1e9efc0 100644 --- a/iti/exchange/base.py +++ b/iti/exchange/base.py @@ -1,232 +1,246 @@ from __future__ import annotations -from dataclasses import dataclass, field +from dataclasses import asdict, dataclass, field from enum import Enum from typing import Any, Protocol, Sequence -class ExchangeTemplateKind(str, Enum): +class ExchangeOperation(str, Enum): IMPORT = "import" EXPORT = "export" -class ExchangeTaskKind(str, Enum): - IMPORT = "import" - EXPORT = "export" +class ExchangeTemplateSourceKind(str, Enum): + LOCAL = "local" + REMOTE = "remote" + MAPPING = "mapping" + CUSTOM = "custom" @dataclass(frozen=True) -class ExchangePlaceholder: - key: str - label: str - description: str | None = None - required: bool = False - example: str | None = None +class ExchangeScope: + biz_domain: str + biz_obj: str + operation: ExchangeOperation | str + + @classmethod + def from_mapping( + cls, + *, + biz_domain: str, + biz_obj: str, + operation: ExchangeOperation | str, + ) -> "ExchangeScope": + return cls( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, + ) + + def key(self) -> str: + return f"{_slug_token(self.biz_domain)}:{_slug_token(self.biz_obj)}:{_slug_token(_operation_value(self.operation))}" + + def code(self) -> str: + return ".".join( + [ + _slug_token(self.biz_domain), + _slug_token(self.biz_obj), + _slug_token(_operation_value(self.operation)), + ] + ) @dataclass(frozen=True) -class ExchangeField: +class ExchangeVariable: key: str label: str - placeholder: str | None = None + header: str | None = None + description: str | None = None required: bool = False example: str | None = None - width: int | None = None - format: str | None = None - source: str | None = None - target: str | None = None - options: tuple[tuple[str, str], ...] = () - meta: dict[str, Any] = field(default_factory=dict) def workbook_header(self) -> str: - return self.placeholder or self.label or self.key - - def export_source_key(self) -> str: - return self.source or self.key - - def import_target_key(self) -> str: - return self.target or self.key + return self.header or self.label or self.key @dataclass(frozen=True) -class ExchangeTemplateBinding: - entity: str - template_kind: ExchangeTemplateKind - handler: str | None = None - description: str | None = None - default_sheet_name: str | None = None - default_file_name: str | None = None +class ExchangeTemplateLayout: title: str | None = None - meta: dict[str, Any] = field(default_factory=dict) - - -class ExchangeTemplateSourceKind(str, Enum): - LOCAL = "local" - REMOTE = "remote" - MAPPING = "mapping" - CUSTOM = "custom" - - -@dataclass(frozen=True) -class ExchangeTemplateSnapshot: - id: str - version: str - template_id: str - template_kind: ExchangeTemplateKind - bindings: tuple[ExchangeTemplateBinding, ...] = () - published_at: str | None = None - file_key: str | None = None - checksum: str | None = None - fields: tuple[ExchangeField, ...] = () - placeholders: tuple[ExchangePlaceholder, ...] = () - meta: dict[str, Any] = field(default_factory=dict) - - def to_plan(self) -> "ExchangePlan": - meta = dict(self.meta) - return ExchangePlan( - template_kind=self.template_kind, - template_id=self.template_id, - version_id=self.id, - version=self.version, - bindings=self.bindings, - fields=self.fields, - placeholders=self.placeholders, - title=meta.get("title"), - description=meta.get("description"), - sheet_name=meta.get("sheet_name"), - meta=meta, - ) + sheet_name: str | None = None + title_row: int | None = 1 + header_row: int = 2 + data_start_row: int | None = None @dataclass(frozen=True) -class ExchangePlan: - template_kind: ExchangeTemplateKind +class ExchangeTemplatePlan: + scope: ExchangeScope + code: str | None = None + name: str | None = None + description: str | None = None template_id: str | None = None version_id: str | None = None version: str | None = None - bindings: tuple[ExchangeTemplateBinding, ...] = () - fields: tuple[ExchangeField, ...] = () - placeholders: tuple[ExchangePlaceholder, ...] = () - title: str | None = None - description: str | None = None - sheet_name: str | None = None - meta: dict[str, Any] = field(default_factory=dict) + layout: ExchangeTemplateLayout = field(default_factory=ExchangeTemplateLayout) + variables: tuple[ExchangeVariable, ...] = () @classmethod def from_mapping( cls, *, - template_kind: ExchangeTemplateKind | str, + biz_domain: str, + biz_obj: str, + operation: ExchangeOperation | str, + code: str | None = None, + name: str | None = None, + description: str | None = None, template_id: str | None = None, version_id: str | None = None, version: str | None = None, - bindings: Sequence[ExchangeTemplateBinding] | None = None, - fields: Sequence[ExchangeField] | None = None, - placeholders: Sequence[ExchangePlaceholder] | None = None, - title: str | None = None, - description: str | None = None, - sheet_name: str | None = None, - meta: dict[str, Any] | None = None, - ) -> "ExchangePlan": + layout: ExchangeTemplateLayout | dict[str, Any] | None = None, + variables: Sequence[ExchangeVariable] | None = None, + ) -> "ExchangeTemplatePlan": return cls( - template_kind=ExchangeTemplateKind(template_kind), + scope=ExchangeScope.from_mapping( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, + ), + code=code, + name=name, + description=description, template_id=template_id, version_id=version_id, version=version, - bindings=tuple(bindings or ()), - fields=tuple(fields or ()), - placeholders=tuple(placeholders or ()), - title=title, - description=description, - sheet_name=sheet_name, - meta=meta or {}, + layout=_coerce_layout(layout), + variables=tuple(variables or ()), ) - def resolved_meta(self) -> dict[str, Any]: - meta = dict(self.meta) - if self.template_id is not None: - meta.setdefault("template_id", self.template_id) - if self.version_id is not None: - meta.setdefault("version_id", self.version_id) - if self.version is not None: - meta.setdefault("version", self.version) - if self.title is not None: - meta.setdefault("title", self.title) - if self.description is not None: - meta.setdefault("description", self.description) - if self.sheet_name is not None: - meta.setdefault("sheet_name", self.sheet_name) - return meta - + def generated_code(self) -> str: + return self.code or self.scope.code() -@dataclass(frozen=True) -class ExchangeTemplateSource: - kind: ExchangeTemplateSourceKind - template_kind: ExchangeTemplateKind - template_id: str | None = None - version_id: str | None = None - version: str | None = None - service: str | None = None - bindings: tuple[ExchangeTemplateBinding, ...] = () - fields: tuple[ExchangeField, ...] = () - placeholders: tuple[ExchangePlaceholder, ...] = () - title: str | None = None - description: str | None = None - sheet_name: str | None = None - meta: dict[str, Any] = field(default_factory=dict) - - def to_plan(self) -> ExchangePlan: - meta = dict(self.meta) - return ExchangePlan( - template_kind=self.template_kind, + def to_snapshot( + self, + *, + published_at: str | None = None, + file_key: str | None = None, + checksum: str | None = None, + ) -> "ExchangeTemplateSnapshot": + return ExchangeTemplateSnapshot( + scope=self.scope, + code=self.code, + name=self.name, + description=self.description, template_id=self.template_id, version_id=self.version_id, version=self.version, - bindings=self.bindings, - fields=self.fields, - placeholders=self.placeholders, - title=self.title, - description=self.description, - sheet_name=self.sheet_name, - meta=meta, + layout=self.layout, + variables=self.variables, + published_at=published_at, + file_key=file_key, + checksum=checksum, + ) + + def as_payload(self) -> dict[str, Any]: + return { + "scope": _scope_payload(self.scope), + "code": self.generated_code(), + "name": self.name, + "description": self.description, + "template_id": self.template_id, + "version_id": self.version_id, + "version": self.version, + "layout": asdict(self.layout), + "variables": [asdict(item) for item in self.variables], + } + + +@dataclass(frozen=True) +class ExchangeTemplateSnapshot(ExchangeTemplatePlan): + published_at: str | None = None + file_key: str | None = None + checksum: str | None = None + + def as_payload(self) -> dict[str, Any]: + payload = super().as_payload() + payload.update( + { + "published_at": self.published_at, + "file_key": self.file_key, + "checksum": self.checksum, + } ) + return payload @dataclass(frozen=True) -class ExchangeTemplate: - id: str - code: str +class ExchangeBusinessSpec: + scope: ExchangeScope name: str - template_kind: ExchangeTemplateKind - entity: str - status: str = "draft" description: str | None = None - current_version: str | None = None - bindings: tuple[ExchangeTemplateBinding, ...] = () - fields: tuple[ExchangeField, ...] = () - placeholders: tuple[ExchangePlaceholder, ...] = () - meta: dict[str, Any] = field(default_factory=dict) - - def to_plan(self) -> ExchangePlan: - meta = dict(self.meta) - return ExchangePlan( - template_kind=self.template_kind, - template_id=self.id, - version_id=self.current_version, - bindings=self.bindings, - fields=self.fields, - placeholders=self.placeholders, - title=self.name, + layout: ExchangeTemplateLayout = field(default_factory=ExchangeTemplateLayout) + variables: tuple[ExchangeVariable, ...] = () + code: str | None = None + handler_name: str | None = None + + def generated_code(self) -> str: + return self.code or self.scope.code() + + def to_plan( + self, + *, + template_id: str | None = None, + version_id: str | None = None, + version: str | None = None, + ) -> ExchangeTemplatePlan: + return ExchangeTemplatePlan( + scope=self.scope, + code=self.generated_code(), + name=self.name, description=self.description, - sheet_name=meta.get("sheet_name"), - meta={ - "code": self.code, - "status": self.status, - "current_version": self.current_version, - **meta, - }, + template_id=template_id, + version_id=version_id, + version=version, + layout=self.layout, + variables=self.variables, ) + def as_payload(self) -> dict[str, Any]: + payload = { + "scope": _scope_payload(self.scope), + "code": self.generated_code(), + "name": self.name, + "description": self.description, + "layout": asdict(self.layout), + "variables": [asdict(item) for item in self.variables], + } + if self.handler_name is not None: + payload["handler_name"] = self.handler_name + return payload + + +@dataclass(frozen=True) +class ExchangeTaskContext: + task_id: str + plan: ExchangeTemplatePlan + snapshot: ExchangeTemplateSnapshot | None = None + storage_key: str | None = None + payload: dict[str, Any] = field(default_factory=dict) + requested_by: str | None = None + + +@dataclass(frozen=True) +class ExchangeTaskResult: + success_count: int = 0 + failed_count: int = 0 + message: str | None = None + result_payload: dict[str, Any] = field(default_factory=dict) + + +class ExchangeTaskHandler(Protocol): + def __call__(self, context: ExchangeTaskContext) -> ExchangeTaskResult: + ... + class DataExchangeModule(Protocol): name: str @@ -245,3 +259,49 @@ class DataExchangeModule(Protocol): def register_tasks(self, app) -> None: ... + + +def _coerce_layout(value: ExchangeTemplateLayout | dict[str, Any] | None) -> ExchangeTemplateLayout: + if value is None: + return ExchangeTemplateLayout() + if isinstance(value, ExchangeTemplateLayout): + return value + return ExchangeTemplateLayout( + title=value.get("title"), + sheet_name=value.get("sheet_name") or value.get("sheetName"), + title_row=value.get("title_row", value.get("titleRow", 1)), + header_row=value.get("header_row", value.get("headerRow", 2)), + data_start_row=value.get("data_start_row") or value.get("dataStartRow"), + ) + + +def _operation_value(value: ExchangeOperation | str) -> str: + return value.value if isinstance(value, ExchangeOperation) else str(value) + + +def _scope_payload(scope: ExchangeScope) -> dict[str, str]: + return { + "biz_domain": scope.biz_domain, + "biz_obj": scope.biz_obj, + "operation": _operation_value(scope.operation), + } + + +def _slug_token(value: str) -> str: + normalized = [] + previous_underscore = False + for char in str(value).strip().lower(): + if char.isalnum(): + normalized.append(char) + previous_underscore = False + continue + if char in {"_", "-"}: + if not previous_underscore: + normalized.append("_") + previous_underscore = True + continue + if not previous_underscore: + normalized.append("_") + previous_underscore = True + token = "".join(normalized).strip("_") + return token or "item" diff --git a/iti/exchange/excel.py b/iti/exchange/excel.py index b74a274..c61bb1d 100644 --- a/iti/exchange/excel.py +++ b/iti/exchange/excel.py @@ -8,235 +8,104 @@ import pandas as pd from openpyxl import Workbook, load_workbook from openpyxl.worksheet.worksheet import Worksheet -from .base import ExchangeField, ExchangePlaceholder, ExchangePlan, ExchangeTemplateSnapshot +from .base import ExchangeTemplatePlan, ExchangeTemplateSnapshot, ExchangeVariable @dataclass class ExcelTemplateCodec: - """Render and parse template workbooks.""" + """Render and parse lightweight template workbooks.""" - def build_workbook(self, snapshot: ExchangeTemplateSnapshot | ExchangePlan) -> Workbook: + def build_workbook(self, plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan) -> Workbook: workbook = Workbook() worksheet = workbook.active - sheet_name = getattr(snapshot, "sheet_name", None) - worksheet.title = ( - sheet_name - or snapshot.meta.get("sheet_name") - or (snapshot.bindings[0].default_sheet_name if snapshot.bindings else None) - or _safe_sheet_name("Template") - ) - row = self._write_header(worksheet, snapshot) - row = self._write_bindings(worksheet, snapshot, row) - row = self._write_placeholders(worksheet, snapshot, row) - self._write_fields(worksheet, snapshot, row) + worksheet.title = _safe_sheet_name(plan.layout.sheet_name or "Template") + self._write_header(worksheet, plan) + self._write_variables(worksheet, plan) return workbook def _write_header( - self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan - ) -> int: - meta = snapshot.meta if hasattr(snapshot, "meta") else {} - title = getattr(snapshot, "title", None) - version = getattr(snapshot, "version", None) - worksheet["A1"] = title or meta.get("title") or snapshot.template_id or "Template" - if version: - worksheet["A2"] = f"version: {version}" - elif meta.get("version"): - worksheet["A2"] = f"version: {meta['version']}" - description = getattr(snapshot, "description", None) - if meta.get("description") or description: - worksheet["A3"] = meta.get("description") or description - return 5 - - def _write_bindings( - self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan, row: int - ) -> int: - if not snapshot.bindings: - return row - worksheet.cell(row=row, column=1, value="Bindings") - row += 1 - headers = [ - "entity", - "template_kind", - "handler", - "description", - "default_sheet_name", - "default_file_name", - "title", - ] - for col, value in enumerate(headers, start=1): - worksheet.cell(row=row, column=col, value=value) - row += 1 - for binding in snapshot.bindings: - values = [ - binding.entity, - _enum_value(binding.template_kind), - binding.handler, - binding.description, - binding.default_sheet_name, - binding.default_file_name, - binding.title, - ] - for col, value in enumerate(values, start=1): - worksheet.cell(row=row, column=col, value=value) - row += 1 - return row - - def _write_placeholders( - self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan, row: int - ) -> int: - if not snapshot.placeholders: - return row - worksheet.cell(row=row, column=1, value="Placeholders") - row += 1 - for placeholder in snapshot.placeholders: - worksheet.cell(row=row, column=1, value=placeholder.key) - worksheet.cell(row=row, column=2, value=placeholder.label) - worksheet.cell(row=row, column=3, value=placeholder.description) - worksheet.cell(row=row, column=4, value=placeholder.example) - worksheet.cell(row=row, column=5, value=placeholder.required) - row += 1 - return row - - def _write_fields( - self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan, row: int + self, + worksheet: Worksheet, + plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan, + ) -> None: + worksheet["A1"] = plan.layout.title or plan.name or plan.generated_code() + worksheet["A2"] = "biz_domain" + worksheet["B2"] = plan.scope.biz_domain + worksheet["A3"] = "biz_obj" + worksheet["B3"] = plan.scope.biz_obj + worksheet["A4"] = "operation" + worksheet["B4"] = str(plan.scope.operation) + if plan.version: + worksheet["A5"] = "version" + worksheet["B5"] = plan.version + if plan.description: + worksheet["A6"] = "description" + worksheet["B6"] = plan.description + + def _write_variables( + self, + worksheet: Worksheet, + plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan, ) -> None: - worksheet.cell(row=row, column=1, value="Fields") - row += 1 - headers = [ - "key", - "label", - "placeholder", - "required", - "example", - "format", - "source", - "target", - ] + start_row = 8 + worksheet.cell(row=start_row, column=1, value="Variables") + headers = ["key", "label", "header", "required", "example", "description"] for col, value in enumerate(headers, start=1): - worksheet.cell(row=row, column=col, value=value) - row += 1 - for field in snapshot.fields: - values = [ - field.key, - field.label, - field.placeholder, - field.required, - field.example, - field.format, - field.source, - field.target, - ] - for col, value in enumerate(values, start=1): - worksheet.cell(row=row, column=col, value=value) - row += 1 - - def dump(self, snapshot: ExchangeTemplateSnapshot | ExchangePlan) -> bytes: + worksheet.cell(row=start_row + 1, column=col, value=value) + for row_index, variable in enumerate(plan.variables, start=start_row + 2): + worksheet.cell(row=row_index, column=1, value=variable.key) + worksheet.cell(row=row_index, column=2, value=variable.label) + worksheet.cell(row=row_index, column=3, value=variable.header) + worksheet.cell(row=row_index, column=4, value=variable.required) + worksheet.cell(row=row_index, column=5, value=variable.example) + worksheet.cell(row=row_index, column=6, value=variable.description) + + def dump(self, plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan) -> bytes: buffer = BytesIO() - self.build_workbook(snapshot).save(buffer) + self.build_workbook(plan).save(buffer) return buffer.getvalue() def load(self, content: bytes) -> dict[str, Any]: - workbook = load_workbook(BytesIO(content)) + workbook = load_workbook(BytesIO(content), data_only=True) worksheet = workbook.active payload = { "title": worksheet["A1"].value, - "version": worksheet["A2"].value, - "description": worksheet["A3"].value, "sheet_name": worksheet.title, + "variables": self._parse_variables(worksheet), } - payload["bindings"], payload["placeholders"], payload["fields"] = self._parse_sections( - worksheet - ) + workbook.close() return payload - def _parse_sections( - self, worksheet: Worksheet - ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: - bindings: list[dict[str, Any]] = [] - placeholders: list[dict[str, Any]] = [] - fields: list[dict[str, Any]] = [] + def _parse_variables(self, worksheet: Worksheet) -> list[dict[str, Any]]: + variables: list[dict[str, Any]] = [] mode: str | None = None headers: list[str] = [] - for row in worksheet.iter_rows(values_only=True): - cells = [cell for cell in row] + cells = list(row) first = cells[0] if cells else None - if first == "Bindings": - mode = "bindings_headers" - headers = [] + if first == "Variables": + mode = "headers" continue - if first == "Placeholders": - mode = "placeholders" - continue - if first == "Fields": - mode = "fields_headers" - headers = [] - continue - if mode == "bindings_headers": + if mode == "headers": headers = [str(cell) if cell is not None else "" for cell in cells] - if not headers: - continue - mode = "bindings" - continue - if mode == "bindings": - if not any(cell is not None for cell in cells): - continue - item = {headers[idx]: cells[idx] for idx in range(min(len(headers), len(cells)))} - if not item.get("entity") and not item.get("template_kind"): - continue - bindings.append( - { - "entity": item.get("entity"), - "template_kind": item.get("template_kind"), - "handler": item.get("handler"), - "description": item.get("description"), - "default_sheet_name": item.get("default_sheet_name"), - "default_file_name": item.get("default_file_name"), - "title": item.get("title"), - "meta": {}, - } - ) + mode = "rows" continue - if mode == "placeholders": - if not any(cell is not None for cell in cells): - continue - placeholders.append( - { - "key": cells[0], - "label": cells[1], - "description": cells[2], - "example": cells[3], - "required": bool(cells[4]) if len(cells) > 4 else False, - } - ) + if mode != "rows" or not any(cell is not None for cell in cells): continue - if mode == "fields_headers": - headers = [str(cell) if cell is not None else "" for cell in cells] - if not headers: - continue - mode = "fields" + item = {headers[idx]: cells[idx] for idx in range(min(len(headers), len(cells)))} + if item.get("key") is None: continue - if mode == "fields": - if not any(cell is not None for cell in cells): - continue - item = {headers[idx]: cells[idx] for idx in range(min(len(headers), len(cells)))} - if item.get("key") is None and item.get("label") is None: - continue - fields.append( - { - "key": item.get("key"), - "label": item.get("label"), - "placeholder": item.get("placeholder"), - "required": bool(item.get("required", False)), - "example": item.get("example"), - "format": item.get("format"), - "source": item.get("source"), - "target": item.get("target"), - "options": [], - "meta": {}, - } - ) - return bindings, placeholders, fields + variables.append( + { + "key": item.get("key"), + "label": item.get("label") or item.get("key"), + "header": item.get("header"), + "required": bool(item.get("required", False)), + "example": item.get("example"), + "description": item.get("description"), + } + ) + return variables @dataclass @@ -260,53 +129,69 @@ class ExcelWorkbookCodec: ) return buffer.getvalue() - def import_rows(self, content: bytes) -> list[dict[str, Any]]: + def import_rows( + self, + content: bytes, + *, + header_row: int = 1, + data_start_row: int | None = None, + ) -> list[dict[str, Any]]: dataframe = self._read_sheet(content) if dataframe.empty and len(dataframe.columns) == 0: return [] - headers = [self._header_name(value) for value in dataframe.iloc[0].tolist()] - return self._frame_to_records(dataframe.iloc[1:], headers) + header_index = max(header_row - 1, 0) + if header_index >= len(dataframe): + return [] + headers = [self._header_name(value) for value in dataframe.iloc[header_index].tolist()] + start_index = max((data_start_row or header_row + 1) - 1, 0) + return self._frame_to_records(dataframe.iloc[start_index:], headers) - def import_rows_with_fields( + def import_rows_with_variables( self, content: bytes, *, - fields: list[ExchangeField], + variables: list[ExchangeVariable], + header_row: int = 1, + data_start_row: int | None = None, ) -> list[dict[str, Any]]: dataframe = self._read_sheet(content) if dataframe.empty and len(dataframe.columns) == 0: return [] - header_map = {field.workbook_header(): field.import_target_key() for field in fields} - headers = [self._header_name(value) for value in dataframe.iloc[0].tolist()] - return self._frame_to_records(dataframe.iloc[1:], headers, header_map=header_map) + header_index = max(header_row - 1, 0) + if header_index >= len(dataframe): + return [] + header_map = {variable.workbook_header(): variable.key for variable in variables} + headers = [self._header_name(value) for value in dataframe.iloc[header_index].tolist()] + start_index = max((data_start_row or header_row + 1) - 1, 0) + return self._frame_to_records(dataframe.iloc[start_index:], headers, header_map=header_map) - def export_rows_with_template( + def export_rows_with_variables( self, *, - fields: list[ExchangeField], + variables: list[ExchangeVariable], rows: list[dict[str, Any]], sheet_name: str = "Export", ) -> bytes: - headers = [field.workbook_header() for field in fields] + headers = [variable.workbook_header() for variable in variables] normalized_rows: list[dict[str, Any]] = [] for row in rows: item: dict[str, Any] = {} - for field in fields: - item[field.workbook_header()] = row.get(field.export_source_key()) + for variable in variables: + item[variable.workbook_header()] = row.get(variable.key) normalized_rows.append(item) return self.export_rows(headers, normalized_rows, sheet_name=sheet_name) def export_rows_with_plan( self, *, - plan: ExchangePlan, + plan: ExchangeTemplatePlan, rows: list[dict[str, Any]], sheet_name: str | None = None, ) -> bytes: - return self.export_rows_with_template( - fields=list(plan.fields), + return self.export_rows_with_variables( + variables=list(plan.variables), rows=rows, - sheet_name=sheet_name or plan.sheet_name or "Export", + sheet_name=sheet_name or plan.layout.sheet_name or "Export", ) def _read_sheet(self, content: bytes) -> pd.DataFrame: @@ -331,13 +216,18 @@ class ExcelWorkbookCodec: result: list[dict[str, Any]] = [] for values in dataframe.itertuples(index=False, name=None): item: dict[str, Any] = {} + has_value = False for index, header in enumerate(headers): if not header: continue key = header_map.get(header, header) if header_map is not None else header value = values[index] if index < len(values) else None - item[key] = self._normalize_value(value) - result.append(item) + normalized = self._normalize_value(value) + if normalized is not None: + has_value = True + item[key] = normalized + if has_value: + result.append(item) return result @staticmethod @@ -356,7 +246,3 @@ def _safe_sheet_name(value: str) -> str: if not cleaned: cleaned = "Sheet" return cleaned[:31] - - -def _enum_value(value: Any) -> Any: - return value.value if hasattr(value, "value") else value diff --git a/iti/exchange/models.py b/iti/exchange/models.py index b7a82d5..a1a8375 100644 --- a/iti/exchange/models.py +++ b/iti/exchange/models.py @@ -11,16 +11,33 @@ from iti.db import Base, IdMixin, TimestampMixin class ExchangeTemplateModel(Base, IdMixin, TimestampMixin): __tablename__ = "exchange_templates" + __table_args__ = ( + UniqueConstraint( + "biz_domain", + "biz_obj", + "operation", + "code", + name="uq_exchange_templates_scope_code", + ), + Index( + "ix_exchange_templates_scope", + "biz_domain", + "biz_obj", + "operation", + ), + ) code: Mapped[str] = mapped_column(String(128), unique=True, index=True, comment="模板编码") name: Mapped[str] = mapped_column(String(255), comment="模板名称") - template_kind: Mapped[str] = mapped_column(String(32), index=True, comment="模板类型") - entity: Mapped[str] = mapped_column(String(128), index=True, comment="业务实体") + biz_domain: Mapped[str] = mapped_column(String(128), index=True, comment="业务域") + biz_obj: Mapped[str] = mapped_column(String(128), index=True, comment="业务对象") + operation: Mapped[str] = mapped_column(String(32), index=True, comment="业务操作") status: Mapped[str] = mapped_column(String(32), default="draft", index=True, comment="状态") description: Mapped[str | None] = mapped_column(Text, nullable=True, comment="说明") current_version: Mapped[str | None] = mapped_column( String(64), nullable=True, index=True, comment="当前版本" ) + layout: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict, comment="布局标记") meta: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict, comment="扩展配置") versions: Mapped[list["ExchangeTemplateVersionModel"]] = relationship( @@ -43,7 +60,9 @@ class ExchangeTemplateVersionModel(Base, IdMixin, TimestampMixin): comment="模板ID", ) version: Mapped[str] = mapped_column(String(64), comment="版本号") - template_kind: Mapped[str] = mapped_column(String(32), index=True, comment="模板类型") + biz_domain: Mapped[str] = mapped_column(String(128), index=True, comment="业务域") + biz_obj: Mapped[str] = mapped_column(String(128), index=True, comment="业务对象") + operation: Mapped[str] = mapped_column(String(32), index=True, comment="业务操作") published_at: Mapped[datetime | None] = mapped_column( DateTime, nullable=True, @@ -51,9 +70,8 @@ class ExchangeTemplateVersionModel(Base, IdMixin, TimestampMixin): ) file_key: Mapped[str | None] = mapped_column(String(512), nullable=True, comment="模板文件") checksum: Mapped[str | None] = mapped_column(String(128), nullable=True, comment="校验值") - bindings: Mapped[list[dict[str, Any]]] = mapped_column(JSON, default=list, comment="绑定配置") - fields: Mapped[list[dict[str, Any]]] = mapped_column(JSON, default=list, comment="字段定义") - placeholders: Mapped[list[dict[str, Any]]] = mapped_column(JSON, default=list, comment="占位符定义") + layout: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict, comment="布局标记") + variables: Mapped[list[dict[str, Any]]] = mapped_column(JSON, default=list, comment="模板变量") meta: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict, comment="扩展配置") template: Mapped["ExchangeTemplateModel"] = relationship(back_populates="versions") @@ -62,7 +80,14 @@ class ExchangeTemplateVersionModel(Base, IdMixin, TimestampMixin): class ExchangeTaskModel(Base, IdMixin, TimestampMixin): __tablename__ = "exchange_tasks" __table_args__ = ( - Index("ix_exchange_tasks_template_id_kind_status", "template_id", "task_kind", "status"), + Index( + "ix_exchange_tasks_scope_status", + "biz_domain", + "biz_obj", + "operation", + "status", + ), + Index("ix_exchange_tasks_template_id_status", "template_id", "status"), Index("ix_exchange_tasks_version_id", "template_version_id"), ) @@ -80,7 +105,9 @@ class ExchangeTaskModel(Base, IdMixin, TimestampMixin): index=True, comment="模板版本ID", ) - task_kind: Mapped[str] = mapped_column(String(32), index=True, comment="任务类型") + biz_domain: Mapped[str] = mapped_column(String(128), index=True, comment="业务域") + biz_obj: Mapped[str] = mapped_column(String(128), index=True, comment="业务对象") + operation: Mapped[str] = mapped_column(String(32), index=True, comment="业务操作") status: Mapped[str] = mapped_column(String(32), default="pending", index=True, comment="状态") requested_by: Mapped[str | None] = mapped_column(String(36), nullable=True, index=True, comment="发起人") storage_key: Mapped[str | None] = mapped_column(String(512), nullable=True, comment="任务文件") diff --git a/iti/exchange/plan.py b/iti/exchange/plan.py index 0ce00bf..92d0377 100644 --- a/iti/exchange/plan.py +++ b/iti/exchange/plan.py @@ -3,41 +3,34 @@ from __future__ import annotations from dataclasses import dataclass from typing import Any -from .base import ( - ExchangeField, - ExchangePlaceholder, - ExchangePlan, - ExchangeTemplateBinding, - ExchangeTemplateKind, -) +from .base import ExchangeOperation, ExchangeTemplateLayout, ExchangeTemplatePlan, ExchangeVariable @dataclass(frozen=True) -class ExchangeMappingPlanInput: - template_kind: ExchangeTemplateKind | str +class ExchangePlanInput: + biz_domain: str + biz_obj: str + operation: ExchangeOperation | str template_id: str | None = None version_id: str | None = None version: str | None = None - bindings: list[ExchangeTemplateBinding] | None = None - fields: list[ExchangeField] | None = None - placeholders: list[ExchangePlaceholder] | None = None - title: str | None = None + code: str | None = None + name: str | None = None description: str | None = None - sheet_name: str | None = None - meta: dict[str, Any] | None = None + layout: ExchangeTemplateLayout | dict[str, Any] | None = None + variables: list[ExchangeVariable] | None = None - def to_plan(self) -> ExchangePlan: - return ExchangePlan.from_mapping( - template_kind=self.template_kind, + def to_plan(self) -> ExchangeTemplatePlan: + return ExchangeTemplatePlan.from_mapping( + biz_domain=self.biz_domain, + biz_obj=self.biz_obj, + operation=self.operation, template_id=self.template_id, version_id=self.version_id, version=self.version, - bindings=self.bindings, - fields=self.fields, - placeholders=self.placeholders, - title=self.title, + code=self.code, + name=self.name, description=self.description, - sheet_name=self.sheet_name, - meta=self.meta, + layout=self.layout, + variables=self.variables, ) - diff --git a/iti/exchange/registry.py b/iti/exchange/registry.py index f4f804f..dbf7dde 100644 --- a/iti/exchange/registry.py +++ b/iti/exchange/registry.py @@ -1,63 +1,125 @@ from __future__ import annotations from dataclasses import dataclass, field +from typing import Any -from .base import ExchangeTemplate, ExchangeTemplateSnapshot +from .base import ExchangeBusinessSpec, ExchangeScope, ExchangeTaskHandler @dataclass class ExchangeRegistry: - templates: dict[str, ExchangeTemplate] = field(default_factory=dict) - versions: dict[str, ExchangeTemplateSnapshot] = field(default_factory=dict) + specs: dict[str, ExchangeBusinessSpec] = field(default_factory=dict) + handlers: dict[str, ExchangeTaskHandler] = field(default_factory=dict) sources: dict[str, object] = field(default_factory=dict) - def register_template(self, template: ExchangeTemplate) -> ExchangeTemplate: - if not template.id: - raise ValueError("template id is required") - if not template.code: - raise ValueError("template code is required") - if template.id in self.templates: - raise ValueError(f"template already registered: {template.id}") - self.templates[template.id] = template - return template - - def register_version( - self, snapshot: ExchangeTemplateSnapshot - ) -> ExchangeTemplateSnapshot: - if not snapshot.id: - raise ValueError("snapshot id is required") - if not snapshot.template_id: - raise ValueError("snapshot template id is required") - key = self._version_key(snapshot.template_id, snapshot.version) - if key in self.versions: - raise ValueError(f"template version already registered: {key}") - self.versions[key] = snapshot - return snapshot - - def get_template(self, template_id: str) -> ExchangeTemplate | None: - return self.templates.get(template_id) - - def get_version( - self, template_id: str, version: str - ) -> ExchangeTemplateSnapshot | None: - return self.versions.get(self._version_key(template_id, version)) - - def latest_version( - self, template_id: str - ) -> ExchangeTemplateSnapshot | None: - template = self.templates.get(template_id) - if template is None or not template.current_version: + def register_spec( + self, + spec: ExchangeBusinessSpec, + *, + handler: ExchangeTaskHandler | None = None, + handler_name: str | None = None, + ) -> ExchangeBusinessSpec: + key = spec.scope.key() + if key in self.specs: + raise ValueError(f"exchange spec already registered: {key}") + resolved_handler_name = handler_name or spec.handler_name or key + if handler is not None: + self.register_handler(resolved_handler_name, handler) + spec = ExchangeBusinessSpec( + scope=spec.scope, + name=spec.name, + description=spec.description, + layout=spec.layout, + variables=spec.variables, + code=spec.code, + handler_name=resolved_handler_name, + ) + self.specs[key] = spec + return spec + + def get_spec( + self, + *, + biz_domain: str, + biz_obj: str, + operation: str, + ) -> ExchangeBusinessSpec | None: + return self.specs.get( + ExchangeScope.from_mapping( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, + ).key() + ) + + def list_specs(self) -> list[ExchangeBusinessSpec]: + return sorted( + self.specs.values(), + key=lambda item: ( + item.scope.biz_domain, + item.scope.biz_obj, + str(item.scope.operation), + ), + ) + + def register_handler( + self, + name: str, + handler: ExchangeTaskHandler, + ) -> ExchangeTaskHandler: + if not name: + raise ValueError("handler name is required") + if name in self.handlers: + raise ValueError(f"exchange handler already registered: {name}") + self.handlers[name] = handler + return handler + + def get_handler(self, name: str) -> ExchangeTaskHandler | None: + return self.handlers.get(name) + + def get_scope_handler( + self, + *, + biz_domain: str, + biz_obj: str, + operation: str, + ) -> ExchangeTaskHandler | None: + spec = self.get_spec( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, + ) + if spec is None: return None - return self.get_version(template_id, template.current_version) - - def list_templates(self) -> list[ExchangeTemplate]: - return sorted(self.templates.values(), key=lambda item: (item.entity, item.code)) - - def list_versions(self, template_id: str | None = None) -> list[ExchangeTemplateSnapshot]: - snapshots = list(self.versions.values()) - if template_id is not None: - snapshots = [item for item in snapshots if item.template_id == template_id] - return sorted(snapshots, key=lambda item: (item.template_id, item.version)) + return self.handlers.get(spec.handler_name or spec.scope.key()) + + def catalog(self) -> list[dict[str, Any]]: + domains: dict[str, dict[str, Any]] = {} + for spec in self.list_specs(): + domain = domains.setdefault( + spec.scope.biz_domain, + { + "biz_domain": spec.scope.biz_domain, + "objects": {}, + }, + ) + obj = domain["objects"].setdefault( + spec.scope.biz_obj, + { + "biz_obj": spec.scope.biz_obj, + "operations": [], + }, + ) + obj["operations"].append(spec.as_payload()) + result: list[dict[str, Any]] = [] + for domain in domains.values(): + result.append( + { + "biz_domain": domain["biz_domain"], + "objects": list(domain["objects"].values()), + } + ) + return result def register_source(self, name: str, source: object) -> object: if not name: @@ -70,10 +132,6 @@ class ExchangeRegistry: def get_source(self, name: str) -> object | None: return self.sources.get(name) - @staticmethod - def _version_key(template_id: str, version: str) -> str: - return f"{template_id}:{version}" - def get_exchange_registry(app) -> ExchangeRegistry: registry = getattr(app.state, "iti_exchange", None) @@ -83,6 +141,28 @@ def get_exchange_registry(app) -> ExchangeRegistry: return registry +def register_exchange_spec( + app, + spec: ExchangeBusinessSpec, + *, + handler: ExchangeTaskHandler | None = None, + handler_name: str | None = None, +) -> ExchangeBusinessSpec: + return get_exchange_registry(app).register_spec( + spec, + handler=handler, + handler_name=handler_name, + ) + + +def register_exchange_handler( + app, + name: str, + handler: ExchangeTaskHandler, +) -> ExchangeTaskHandler: + return get_exchange_registry(app).register_handler(name, handler) + + def register_exchange_source(app, name: str, source: object) -> object: return get_exchange_registry(app).register_source(name, source) diff --git a/iti/exchange/routes.py b/iti/exchange/routes.py index 45e6f08..d5d8929 100644 --- a/iti/exchange/routes.py +++ b/iti/exchange/routes.py @@ -1,8 +1,6 @@ from __future__ import annotations -from dataclasses import asdict - -from fastapi import APIRouter, Depends, File, Request, UploadFile +from fastapi import APIRouter, Depends, File, Query, Request, UploadFile from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session @@ -10,14 +8,11 @@ from iti.db import get_db from iti.exceptions import BizError from iti.responses import ok, raw_response -from .base import ExchangeField, ExchangePlaceholder, ExchangeTemplateBinding, ExchangePlan +from .base import ExchangeTemplateSourceKind, ExchangeVariable +from .registry import get_exchange_registry from .schemas import ( - ExchangeFieldSchema, ExchangePlanResolveRequest, ExchangePlanTemplateFileRequest, - ExchangePlaceholderSchema, - ExchangeTemplateBindingSchema, - ExchangeTemplateSourceKind, ExchangeTaskCreateRequest, ExchangeTaskResponse, ExchangeTemplateCreateRequest, @@ -38,11 +33,13 @@ def _template_payload(item): "id": item.id, "code": item.code, "name": item.name, - "template_kind": item.template_kind, - "entity": item.entity, + "biz_domain": item.biz_domain, + "biz_obj": item.biz_obj, + "operation": item.operation, "status": item.status, "description": item.description, "current_version": item.current_version, + "layout": item.layout, "meta": item.meta, "created_at": item.created_at, "updated_at": item.updated_at, @@ -54,13 +51,14 @@ def _version_payload(item): "id": item.id, "template_id": item.template_id, "version": item.version, - "template_kind": item.template_kind, + "biz_domain": item.biz_domain, + "biz_obj": item.biz_obj, + "operation": item.operation, "published_at": item.published_at, "file_key": item.file_key, "checksum": item.checksum, - "bindings": item.bindings, - "fields": item.fields, - "placeholders": item.placeholders, + "layout": item.layout, + "variables": item.variables, "meta": item.meta, "created_at": item.created_at, "updated_at": item.updated_at, @@ -72,7 +70,9 @@ def _task_payload(item): "id": item.id, "template_id": item.template_id, "template_version_id": item.template_version_id, - "task_kind": item.task_kind, + "biz_domain": item.biz_domain, + "biz_obj": item.biz_obj, + "operation": item.operation, "status": item.status, "requested_by": item.requested_by, "storage_key": item.storage_key, @@ -90,63 +90,16 @@ def _task_payload(item): } -def _plan_payload(plan: ExchangePlan): - return { - "template_kind": plan.template_kind, - "template_id": plan.template_id, - "version_id": plan.version_id, - "version": plan.version, - "bindings": _plan_schema_items(plan.bindings), - "fields": _plan_schema_items(plan.fields), - "placeholders": _plan_schema_items(plan.placeholders), - "title": plan.title, - "description": plan.description, - "sheet_name": plan.sheet_name, - "meta": plan.meta, - } - - -def _plan_schema_items(items): - return [asdict(item) for item in items] +def _plan_payload(plan): + return plan.as_payload() -def _binding_from_payload(item): - return ExchangeTemplateBinding( - entity=item.get("entity"), - template_kind=item.get("template_kind"), - handler=item.get("handler"), - description=item.get("description"), - default_sheet_name=item.get("default_sheet_name"), - default_file_name=item.get("default_file_name"), - title=item.get("title"), - meta=item.get("meta") or {}, - ) - - -def _field_from_payload(item): - return ExchangeField( - key=item.get("key"), - label=item.get("label"), - placeholder=item.get("placeholder"), - required=bool(item.get("required", False)), - example=item.get("example"), - width=item.get("width"), - format=item.get("format"), - source=item.get("source"), - target=item.get("target"), - options=tuple(tuple(option) for option in item.get("options") or []), - meta=item.get("meta") or {}, - ) +def _variable_from_schema(item) -> ExchangeVariable: + return ExchangeVariable(**item.model_dump()) -def _placeholder_from_payload(item): - return ExchangePlaceholder( - key=item.get("key"), - label=item.get("label"), - description=item.get("description"), - required=bool(item.get("required", False)), - example=item.get("example"), - ) +def _layout_payload(layout): + return layout.model_dump() if layout is not None else None def _resolve_source(payload, request: Request, db: Session): @@ -167,18 +120,72 @@ def _resolve_source(payload, request: Request, db: Session): source_kind=source_kind, service_name=payload.source_service or "exchange", ) - if source_kind == ExchangeTemplateSourceKind.MAPPING or source_kind is None: + if source_kind is None: + return None + if source_kind == ExchangeTemplateSourceKind.MAPPING: return get_exchange_source(request.app, source_kind=ExchangeTemplateSourceKind.MAPPING) return get_exchange_source(request.app, source_kind=source_kind, db=db) +@router.get("/catalog") +def exchange_catalog(request: Request): + return ok(get_exchange_registry(request.app).catalog()) + + +@router.get("/scopes/{biz_domain}/{biz_obj}/{operation}") +def get_scope_spec( + biz_domain: str, + biz_obj: str, + operation: str, + request: Request, +): + spec = get_exchange_registry(request.app).get_spec( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, + ) + if spec is None: + raise BizError("业务导入导出规格不存在", code=404) + return ok(spec.as_payload()) + + +@router.get("/templates/code") +def generate_template_code( + biz_domain: str, + biz_obj: str, + operation: str, + request: Request, + db: Session = Depends(get_db), +): + service = ExchangeService(request.app, db) + return ok( + { + "code": service.generate_template_code( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, + ) + } + ) + + @router.get("/templates") -def list_templates(request: Request, db: Session = Depends(get_db)): +def list_templates( + request: Request, + biz_domain: str | None = Query(default=None), + biz_obj: str | None = Query(default=None), + operation: str | None = Query(default=None), + db: Session = Depends(get_db), +): service = ExchangeService(request.app, db) return ok( [ ExchangeTemplateResponse.model_validate(_template_payload(item)).model_dump(mode="json") - for item in service.list_templates() + for item in service.list_templates( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, + ) ] ) @@ -200,9 +207,11 @@ def create_template( template = service.create_template( code=payload.code, name=payload.name, - template_kind=payload.template_kind, - entity=payload.entity, + biz_domain=payload.biz_domain, + biz_obj=payload.biz_obj, + operation=payload.operation, description=payload.description, + layout=_layout_payload(payload.layout), meta=payload.meta, ) return ok(ExchangeTemplateResponse.model_validate(_template_payload(template)).model_dump(mode="json")) @@ -222,11 +231,23 @@ def update_template( description=payload.description, status=payload.status, current_version=payload.current_version, + layout=_layout_payload(payload.layout), meta=payload.meta, ) return ok(ExchangeTemplateResponse.model_validate(_template_payload(template)).model_dump(mode="json")) +@router.delete("/templates/{template_id}") +def delete_template( + template_id: str, + request: Request, + db: Session = Depends(get_db), +): + service = ExchangeService(request.app, db) + service.delete_template(template_id) + return ok() + + @router.get("/templates/{template_id}/versions") def list_versions( template_id: str, @@ -256,22 +277,18 @@ def get_version( return ok(ExchangeTemplateVersionResponse.model_validate(_version_payload(version)).model_dump(mode="json")) -@router.get("/tasks") -def list_tasks(request: Request, db: Session = Depends(get_db)): - service = ExchangeService(request.app, db) - return ok( - [ - ExchangeTaskResponse.model_validate(_task_payload(item)).model_dump(mode="json") - for item in service.list_tasks() - ] - ) - - -@router.get("/tasks/{task_id}") -def get_task(task_id: str, request: Request, db: Session = Depends(get_db)): +@router.get("/templates/{template_id}/versions/by-version/{version}") +def get_version_by_value( + template_id: str, + version: str, + request: Request, + db: Session = Depends(get_db), +): service = ExchangeService(request.app, db) - task = service.get_task_or_404(task_id) - return ok(ExchangeTaskResponse.model_validate(_task_payload(task)).model_dump(mode="json")) + snapshot = service.get_snapshot(template_id=template_id, version=version) + if snapshot is None: + raise BizError("模板版本不存在", code=404) + return ok(snapshot.as_payload()) @router.post("/templates/{template_id}/versions") @@ -285,19 +302,44 @@ def publish_version( version = service.publish_version( template_id=template_id, version=payload.version, - bindings=[ - ExchangeTemplateBinding(**item.model_dump()) for item in payload.bindings - ], - fields=[ExchangeField(**item.model_dump()) for item in payload.fields], - placeholders=[ - ExchangePlaceholder(**item.model_dump()) for item in payload.placeholders - ], + variables=[ + _variable_from_schema(item) for item in payload.variables + ] if payload.variables is not None else None, + layout=_layout_payload(payload.layout), meta=payload.meta, make_current=payload.make_current, ) return ok(ExchangeTemplateVersionResponse.model_validate(_version_payload(version)).model_dump(mode="json")) +@router.delete("/templates/{template_id}/versions/{version_id}") +def delete_version( + template_id: str, + version_id: str, + request: Request, + db: Session = Depends(get_db), +): + service = ExchangeService(request.app, db) + version = service.get_version_or_404(version_id) + if version.template_id != template_id: + raise BizError("模板版本不存在", code=404) + service.delete_version(version_id) + return ok() + + +@router.get("/template-versions/{version_id}") +def get_template_version_by_id( + version_id: str, + request: Request, + db: Session = Depends(get_db), +): + service = ExchangeService(request.app, db) + snapshot = service.get_snapshot_by_version_id(version_id) + if snapshot is None: + raise BizError("模板版本不存在", code=404) + return ok(snapshot.as_payload()) + + @router.get("/template-versions/{version_id}/download") @raw_response def download_template_version( @@ -314,6 +356,34 @@ def download_template_version( ) +@router.post("/templates/{template_id}/versions/upload") +def upload_template_version( + template_id: str, + version: str, + request: Request, + file: UploadFile = File(...), + db: Session = Depends(get_db), +): + service = ExchangeService(request.app, db) + content = file.file.read() + parsed = _excel_template_codec().load(content) + snapshot = service.publish_version( + template_id=template_id, + version=version, + variables=[_variable_from_payload(item) for item in parsed.get("variables", [])], + layout={ + "title": parsed.get("title"), + "sheet_name": parsed.get("sheet_name"), + }, + meta={ + "source_file": file.filename, + }, + file_content=content, + file_name=file.filename, + ) + return ok(ExchangeTemplateVersionResponse.model_validate(_version_payload(snapshot)).model_dump(mode="json")) + + @router.post("/plans/resolve") def resolve_plan( payload: ExchangePlanResolveRequest, @@ -323,17 +393,19 @@ def resolve_plan( service = ExchangeService(request.app, db) source = _resolve_source(payload, request, db) plan = service.resolve_plan( - template_kind=payload.template_kind, + biz_domain=payload.biz_domain, + biz_obj=payload.biz_obj, + operation=payload.operation, template_id=payload.template_id, version_id=payload.version_id, version=payload.version, - bindings=[ExchangeTemplateBinding(**item.model_dump()) for item in payload.bindings], - fields=[ExchangeField(**item.model_dump()) for item in payload.fields], - placeholders=[ExchangePlaceholder(**item.model_dump()) for item in payload.placeholders], - title=payload.title, + code=payload.code, + name=payload.name, description=payload.description, - sheet_name=payload.sheet_name, - meta=payload.meta, + layout=_layout_payload(payload.layout), + variables=[ + _variable_from_schema(item) for item in payload.variables + ] if payload.variables is not None else None, source=source, ) return ok(_plan_payload(plan)) @@ -349,17 +421,19 @@ def build_plan_template_file( service = ExchangeService(request.app, db) source = _resolve_source(payload, request, db) plan = service.resolve_plan( - template_kind=payload.template_kind, + biz_domain=payload.biz_domain, + biz_obj=payload.biz_obj, + operation=payload.operation, template_id=payload.template_id, version_id=payload.version_id, version=payload.version, - bindings=[ExchangeTemplateBinding(**item.model_dump()) for item in payload.bindings], - fields=[ExchangeField(**item.model_dump()) for item in payload.fields], - placeholders=[ExchangePlaceholder(**item.model_dump()) for item in payload.placeholders], - title=payload.title, + code=payload.code, + name=payload.name, description=payload.description, - sheet_name=payload.sheet_name, - meta=payload.meta, + layout=_layout_payload(payload.layout), + variables=[ + _variable_from_schema(item) for item in payload.variables + ] if payload.variables is not None else None, source=source, ) content = source.load_template_file(plan) if source is not None else None @@ -372,41 +446,22 @@ def build_plan_template_file( ) -@router.post("/templates/{template_id}/versions/upload") -def upload_template_version( - template_id: str, - version: str, - request: Request, - file: UploadFile = File(...), - db: Session = Depends(get_db), -): +@router.get("/tasks") +def list_tasks(request: Request, db: Session = Depends(get_db)): service = ExchangeService(request.app, db) - content = file.file.read() - parsed = _excel_template_codec().load(content) - snapshot = service.publish_version( - template_id=template_id, - version=version, - bindings=[_binding_from_payload(item) for item in parsed.get("bindings", [])], - fields=[_field_from_payload(item) for item in parsed.get("fields", [])], - placeholders=[ - _placeholder_from_payload(item) for item in parsed.get("placeholders", []) - ], - meta={ - "title": parsed.get("title"), - "description": parsed.get("description"), - "sheet_name": parsed.get("sheet_name"), - "source_file": file.filename, - }, - file_content=content, - file_name=file.filename, + return ok( + [ + ExchangeTaskResponse.model_validate(_task_payload(item)).model_dump(mode="json") + for item in service.list_tasks() + ] ) - return ok(ExchangeTemplateVersionResponse.model_validate(_version_payload(snapshot)).model_dump(mode="json")) - -def _excel_template_codec(): - from .excel import ExcelTemplateCodec - return ExcelTemplateCodec() +@router.get("/tasks/{task_id}") +def get_task(task_id: str, request: Request, db: Session = Depends(get_db)): + service = ExchangeService(request.app, db) + task = service.get_task_or_404(task_id) + return ok(ExchangeTaskResponse.model_validate(_task_payload(task)).model_dump(mode="json")) @router.post("/tasks") @@ -418,31 +473,41 @@ def create_task( service = ExchangeService(request.app, db) source = _resolve_source(payload, request, db) plan = service.resolve_plan( - template_kind=payload.task_kind, + biz_domain=payload.biz_domain, + biz_obj=payload.biz_obj, + operation=payload.operation, template_id=payload.template_id, version_id=payload.version_id, version=payload.version, - bindings=[ExchangeTemplateBinding(**item.model_dump()) for item in payload.bindings], - fields=[ExchangeField(**item.model_dump()) for item in payload.fields], - placeholders=[ExchangePlaceholder(**item.model_dump()) for item in payload.placeholders], - title=payload.title, + code=payload.code, + name=payload.name, description=payload.description, - sheet_name=payload.sheet_name, - meta=payload.meta, + layout=_layout_payload(payload.layout), + variables=[ + _variable_from_schema(item) for item in payload.variables + ] if payload.variables is not None else None, source=source, ) task = service.create_task( + biz_domain=plan.scope.biz_domain, + biz_obj=plan.scope.biz_obj, + operation=plan.scope.operation, template_id=plan.template_id, version_id=plan.version_id, version=plan.version, - task_kind=payload.task_kind, storage_key=payload.storage_key, input_payload=payload.input_payload, - meta=payload.meta, ) return ok(ExchangeTaskResponse.model_validate(_task_payload(task)).model_dump(mode="json")) +@router.post("/tasks/{task_id}/run") +def run_task(task_id: str, request: Request, db: Session = Depends(get_db)): + service = ExchangeService(request.app, db) + task = service.run_task(task_id) + return ok(ExchangeTaskResponse.model_validate(_task_payload(task)).model_dump(mode="json")) + + @router.get("/tasks/{task_id}/rows") def list_task_rows(task_id: str, request: Request, db: Session = Depends(get_db)): service = ExchangeService(request.app, db) @@ -460,3 +525,20 @@ def list_task_rows(task_id: str, request: Request, db: Session = Depends(get_db) for item in service.list_task_rows(task_id) ] ) + + +def _variable_from_payload(item: dict) -> ExchangeVariable: + return ExchangeVariable( + key=item.get("key"), + label=item.get("label") or item.get("key"), + header=item.get("header"), + description=item.get("description"), + required=bool(item.get("required", False)), + example=item.get("example"), + ) + + +def _excel_template_codec(): + from .excel import ExcelTemplateCodec + + return ExcelTemplateCodec() diff --git a/iti/exchange/schemas.py b/iti/exchange/schemas.py index 23e23ed..d497556 100644 --- a/iti/exchange/schemas.py +++ b/iti/exchange/schemas.py @@ -7,48 +7,39 @@ from pydantic import AliasChoices, Field from iti.schemas import ItiModel -from .base import ExchangeTemplateKind, ExchangeTemplateSourceKind, ExchangeTaskKind +from .base import ExchangeOperation, ExchangeTemplateSourceKind -class ExchangePlaceholderSchema(ItiModel): +class ExchangeVariableSchema(ItiModel): key: str label: str + header: str | None = None description: str | None = None required: bool = False example: str | None = None -class ExchangeFieldSchema(ItiModel): - key: str - label: str - placeholder: str | None = None - required: bool = False - example: str | None = None - width: int | None = None - format: str | None = None - source: str | None = None - target: str | None = None - options: list[tuple[str, str]] = Field(default_factory=list) - meta: dict[str, Any] = Field(default_factory=dict) +class ExchangeTemplateLayoutSchema(ItiModel): + title: str | None = None + sheet_name: str | None = None + title_row: int | None = 1 + header_row: int = 2 + data_start_row: int | None = None -class ExchangeTemplateBindingSchema(ItiModel): - entity: str - template_kind: ExchangeTemplateKind - handler: str | None = None - description: str | None = None - default_sheet_name: str | None = None - default_file_name: str | None = None - title: str | None = None - meta: dict[str, Any] = Field(default_factory=dict) +class ExchangeScopeSchema(ItiModel): + biz_domain: str + biz_obj: str + operation: ExchangeOperation = Field( + validation_alias=AliasChoices("operation", "templateKind", "taskKind") + ) -class ExchangeTemplateCreateRequest(ItiModel): - code: str +class ExchangeTemplateCreateRequest(ExchangeScopeSchema): + code: str | None = None name: str - template_kind: ExchangeTemplateKind - entity: str description: str | None = None + layout: ExchangeTemplateLayoutSchema | None = None meta: dict[str, Any] = Field(default_factory=dict) @@ -57,38 +48,34 @@ class ExchangeTemplateUpdateRequest(ItiModel): description: str | None = None status: str | None = None current_version: str | None = None + layout: ExchangeTemplateLayoutSchema | None = None meta: dict[str, Any] | None = None class ExchangeTemplateVersionCreateRequest(ItiModel): version: str - bindings: list[ExchangeTemplateBindingSchema] = Field(default_factory=list) - fields: list[ExchangeFieldSchema] = Field(default_factory=list) - placeholders: list[ExchangePlaceholderSchema] = Field(default_factory=list) + variables: list[ExchangeVariableSchema] | None = None + layout: ExchangeTemplateLayoutSchema | None = None meta: dict[str, Any] = Field(default_factory=dict) make_current: bool = True -class ExchangePlanRequest(ItiModel): +class ExchangePlanRequest(ExchangeScopeSchema): template_id: str | None = None version_id: str | None = None version: str | None = None + code: str | None = None source_kind: ExchangeTemplateSourceKind | None = None source_name: str | None = None source_service: str | None = None - bindings: list[ExchangeTemplateBindingSchema] = Field(default_factory=list) - fields: list[ExchangeFieldSchema] = Field(default_factory=list) - placeholders: list[ExchangePlaceholderSchema] = Field(default_factory=list) - title: str | None = None + name: str | None = None description: str | None = None - sheet_name: str | None = None - meta: dict[str, Any] = Field(default_factory=dict) + layout: ExchangeTemplateLayoutSchema | None = None + variables: list[ExchangeVariableSchema] | None = None class ExchangePlanResolveRequest(ExchangePlanRequest): - template_kind: ExchangeTemplateKind = Field( - validation_alias=AliasChoices("templateKind", "taskKind") - ) + pass class ExchangePlanTemplateFileRequest(ExchangePlanResolveRequest): @@ -96,20 +83,25 @@ class ExchangePlanTemplateFileRequest(ExchangePlanResolveRequest): class ExchangeTaskCreateRequest(ExchangePlanRequest): - task_kind: ExchangeTaskKind storage_key: str | None = None input_payload: dict[str, Any] = Field(default_factory=dict) +class ExchangeTaskRunRequest(ItiModel): + pass + + class ExchangeTemplateResponse(ItiModel): id: str code: str name: str - template_kind: str - entity: str + biz_domain: str + biz_obj: str + operation: str status: str description: str | None = None current_version: str | None = None + layout: dict[str, Any] = Field(default_factory=dict) meta: dict[str, Any] = Field(default_factory=dict) created_at: datetime updated_at: datetime @@ -119,13 +111,14 @@ class ExchangeTemplateVersionResponse(ItiModel): id: str template_id: str version: str - template_kind: str + biz_domain: str + biz_obj: str + operation: str published_at: datetime | None = None file_key: str | None = None checksum: str | None = None - bindings: list[dict[str, Any]] = Field(default_factory=list) - fields: list[dict[str, Any]] = Field(default_factory=list) - placeholders: list[dict[str, Any]] = Field(default_factory=list) + layout: dict[str, Any] = Field(default_factory=dict) + variables: list[dict[str, Any]] = Field(default_factory=list) meta: dict[str, Any] = Field(default_factory=dict) created_at: datetime updated_at: datetime @@ -135,7 +128,9 @@ class ExchangeTaskResponse(ItiModel): id: str template_id: str | None = None template_version_id: str | None = None - task_kind: str + biz_domain: str + biz_obj: str + operation: str status: str requested_by: str | None = None storage_key: str | None = None diff --git a/iti/exchange/service.py b/iti/exchange/service.py index 4263d77..da2395d 100644 --- a/iti/exchange/service.py +++ b/iti/exchange/service.py @@ -3,23 +3,24 @@ from __future__ import annotations import hashlib from dataclasses import asdict from datetime import datetime -from io import BytesIO from enum import Enum +from io import BytesIO from typing import Any -from sqlalchemy import select +from sqlalchemy import select, update from sqlalchemy.orm import Session from iti.exceptions import BizError from .base import ( - ExchangeField, - ExchangePlaceholder, - ExchangeTemplateBinding, - ExchangeTemplateKind, - ExchangePlan, + ExchangeOperation, + ExchangeScope, + ExchangeTaskContext, + ExchangeTaskResult, + ExchangeTemplateLayout, + ExchangeTemplatePlan, ExchangeTemplateSnapshot, - ExchangeTaskKind, + ExchangeVariable, ) from .models import ( ExchangeTaskModel, @@ -27,6 +28,7 @@ from .models import ( ExchangeTemplateModel, ExchangeTemplateVersionModel, ) +from .registry import get_exchange_registry from .tasks import get_exchange_storage @@ -35,22 +37,44 @@ class ExchangeService: self.app = app self.db = db + def generate_template_code( + self, + *, + biz_domain: str, + biz_obj: str, + operation: ExchangeOperation | str, + ) -> str: + return ExchangeScope.from_mapping( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, + ).code() + def create_template( self, *, - code: str, name: str, - template_kind: ExchangeTemplateKind | str, - entity: str, + biz_domain: str, + biz_obj: str, + operation: ExchangeOperation | str, + code: str | None = None, description: str | None = None, + layout: ExchangeTemplateLayout | dict[str, Any] | None = None, meta: dict[str, Any] | None = None, ) -> ExchangeTemplateModel: + scope = ExchangeScope.from_mapping( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, + ) template = ExchangeTemplateModel( - code=code, + code=code or scope.code(), name=name, - template_kind=_enum_value(template_kind), - entity=entity, + biz_domain=scope.biz_domain, + biz_obj=scope.biz_obj, + operation=_operation_value(scope.operation), description=description, + layout=asdict(_coerce_layout(layout)), meta=meta or {}, ) self.db.add(template) @@ -66,6 +90,7 @@ class ExchangeService: description: str | None = None, status: str | None = None, current_version: str | None = None, + layout: ExchangeTemplateLayout | dict[str, Any] | None = None, meta: dict[str, Any] | None = None, ) -> ExchangeTemplateModel: template = self.get_template_or_404(template_id) @@ -77,26 +102,59 @@ class ExchangeService: template.status = status if current_version is not None: template.current_version = current_version + if layout is not None: + template.layout = asdict(_coerce_layout(layout)) if meta is not None: template.meta = meta self.db.commit() self.db.refresh(template) return template + def delete_template(self, template_id: str) -> None: + template = self.get_template_or_404(template_id) + versions = list(template.versions) + version_ids = [version.id for version in versions] + storage = get_exchange_storage(self.app) + for version in versions: + if version.file_key: + storage.delete(version.file_key) + if version_ids: + self.db.execute( + update(ExchangeTaskModel) + .where(ExchangeTaskModel.template_version_id.in_(version_ids)) + .values(template_version_id=None) + ) + self.db.execute( + update(ExchangeTaskModel) + .where(ExchangeTaskModel.template_id == template.id) + .values(template_id=None, template_version_id=None) + ) + self.db.delete(template) + self.db.commit() + def publish_version( self, *, template_id: str, version: str, - bindings: list[ExchangeTemplateBinding] | None = None, - fields: list[ExchangeField] | None = None, - placeholders: list[ExchangePlaceholder] | None = None, + variables: list[ExchangeVariable] | None = None, + layout: ExchangeTemplateLayout | dict[str, Any] | None = None, file_content: bytes | None = None, file_name: str | None = None, meta: dict[str, Any] | None = None, make_current: bool = True, ) -> ExchangeTemplateVersionModel: template = self.get_template_or_404(template_id) + resolved_layout = _coerce_layout(layout if layout is not None else template.layout) + resolved_variables = variables + if resolved_variables is None: + spec = get_exchange_registry(self.app).get_spec( + biz_domain=template.biz_domain, + biz_obj=template.biz_obj, + operation=template.operation, + ) + resolved_variables = list(spec.variables) if spec is not None else [] + file_key = None checksum = None if file_content is not None: @@ -111,13 +169,14 @@ class ExchangeService: snapshot = ExchangeTemplateVersionModel( template_id=template.id, version=version, - template_kind=template.template_kind, + biz_domain=template.biz_domain, + biz_obj=template.biz_obj, + operation=template.operation, published_at=datetime.now(), file_key=file_key, checksum=checksum, - bindings=[_jsonable(asdict(item)) for item in bindings or []], - fields=[_jsonable(asdict(item)) for item in fields or []], - placeholders=[_jsonable(asdict(item)) for item in placeholders or []], + layout=asdict(resolved_layout), + variables=[_jsonable(asdict(item)) for item in resolved_variables], meta=meta or {}, ) self.db.add(snapshot) @@ -128,30 +187,74 @@ class ExchangeService: self.db.refresh(snapshot) return snapshot + def delete_version(self, version_id: str) -> None: + version = self.get_version_or_404(version_id) + template = self.get_template_or_404(version.template_id) + next_current = None + if template.current_version == version.version: + with self.db.no_autoflush: + next_current = self.db.scalar( + select(ExchangeTemplateVersionModel) + .where(ExchangeTemplateVersionModel.template_id == template.id) + .where(ExchangeTemplateVersionModel.id != version.id) + .order_by( + ExchangeTemplateVersionModel.created_at.desc(), + ExchangeTemplateVersionModel.updated_at.desc(), + ) + ) + if version.file_key: + storage = get_exchange_storage(self.app) + storage.delete(version.file_key) + self.db.execute( + update(ExchangeTaskModel) + .where(ExchangeTaskModel.template_version_id == version.id) + .values(template_version_id=None) + ) + self.db.delete(version) + if template.current_version == version.version: + template.current_version = next_current.version if next_current is not None else None + if next_current is None: + template.status = "draft" + self.db.commit() + def build_template_file(self, version_id: str) -> bytes: version = self.get_version_or_404(version_id) if version.file_key: storage = get_exchange_storage(self.app) with storage.download(version.file_key) as file_stream: return file_stream.read() - snapshot = self.snapshot_from_model(version) - return _excel_template_codec().dump(snapshot) + return _excel_template_codec().dump(self.snapshot_from_model(version)) - def build_plan_template_file(self, plan: ExchangePlan) -> bytes: + def build_plan_template_file(self, plan: ExchangeTemplatePlan) -> bytes: if plan.version_id: - version = self.get_snapshot_by_version_id(plan.version_id) + version = self.db.get(ExchangeTemplateVersionModel, plan.version_id) if version is not None and version.file_key: storage = get_exchange_storage(self.app) with storage.download(version.file_key) as file_stream: return file_stream.read() return _excel_template_codec().dump(plan) + def save_template_file( + self, + *, + template: ExchangeTemplateModel, + version: str, + content: bytes, + file_name: str | None = None, + ) -> str: + suffix = _safe_suffix(file_name or "template.xlsx") + digest = hashlib.sha256(content).hexdigest() + key = f"exchange/templates/{template.code}/{version}/{digest}.{suffix}" + storage = get_exchange_storage(self.app) + storage.upload(BytesIO(content), key, _excel_mime_type()) + return key + def export_rows( self, rows: list[dict[str, Any]], *, - plan: ExchangePlan | None = None, - fields: list[ExchangeField] | None = None, + plan: ExchangeTemplatePlan | None = None, + variables: list[ExchangeVariable] | None = None, sheet_name: str | None = None, ) -> bytes: workbook_codec = _excel_workbook_codec() @@ -161,9 +264,9 @@ class ExchangeService: rows=rows, sheet_name=sheet_name, ) - if fields is not None: - return workbook_codec.export_rows_with_template( - fields=fields, + if variables is not None: + return workbook_codec.export_rows_with_variables( + variables=variables, rows=rows, sheet_name=sheet_name or "Export", ) @@ -176,60 +279,49 @@ class ExchangeService: self, content: bytes, *, - plan: ExchangePlan | None = None, - fields: list[ExchangeField] | None = None, + plan: ExchangeTemplatePlan | None = None, + variables: list[ExchangeVariable] | None = None, ) -> list[dict[str, Any]]: workbook_codec = _excel_workbook_codec() - if plan is not None and plan.fields: - return workbook_codec.import_rows_with_fields(content, fields=list(plan.fields)) - if fields is not None: - return workbook_codec.import_rows_with_fields(content, fields=fields) + if plan is not None and plan.variables: + return workbook_codec.import_rows_with_variables( + content, + variables=list(plan.variables), + header_row=plan.layout.header_row, + data_start_row=plan.layout.data_start_row, + ) + if variables is not None: + return workbook_codec.import_rows_with_variables(content, variables=variables) return workbook_codec.import_rows(content) - def save_template_file( - self, - *, - template: ExchangeTemplateModel, - version: str, - content: bytes, - file_name: str | None = None, - ) -> str: - suffix = _safe_suffix(file_name or "template.xlsx") - key = f"exchange/templates/{template.code}/{version}/{hashlib.sha256(content).hexdigest()}.{suffix}" - storage = get_exchange_storage(self.app) - storage.upload(BytesIO(content), key, _excel_mime_type()) - return key - def create_task( self, *, + biz_domain: str, + biz_obj: str, + operation: ExchangeOperation | str, template_id: str | None = None, version_id: str | None = None, version: str | None = None, - task_kind: ExchangeTaskKind | str, requested_by: str | None = None, storage_key: str | None = None, input_payload: dict[str, Any] | None = None, meta: dict[str, Any] | None = None, ) -> ExchangeTaskModel: - template = self.get_template_or_404(template_id) if template_id else None - version_model = self.get_version_or_404(version_id) if version_id else None - if template is not None and version_model is not None and version_model.template_id != template.id: - raise BizError("模板版本不属于该模板", code=400) - if template is None and version_model is not None: - template = self.get_template_or_404(version_model.template_id) - if template is not None and version_model is None: - if version: - version_model = self.get_snapshot(template_id=template.id, version=version) - elif template.current_version: - version_model = self.get_snapshot( - template_id=template.id, - version=template.current_version, - ) + plan = self.resolve_plan( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, + template_id=template_id, + version_id=version_id, + version=version, + ) task = ExchangeTaskModel( - template_id=template.id if template is not None else None, - template_version_id=version_model.id if version_model is not None else None, - task_kind=_enum_value(task_kind), + template_id=plan.template_id, + template_version_id=plan.version_id, + biz_domain=plan.scope.biz_domain, + biz_obj=plan.scope.biz_obj, + operation=_operation_value(plan.scope.operation), status="pending", requested_by=requested_by, storage_key=storage_key, @@ -241,6 +333,54 @@ class ExchangeService: self.db.refresh(task) return task + def run_task(self, task_id: str) -> ExchangeTaskModel: + task = self.mark_task_running(task_id) + try: + snapshot = ( + self.get_snapshot_by_version_id(task.template_version_id) + if task.template_version_id + else None + ) + plan = snapshot or self.resolve_plan( + biz_domain=task.biz_domain, + biz_obj=task.biz_obj, + operation=task.operation, + template_id=task.template_id, + ) + handler = get_exchange_registry(self.app).get_scope_handler( + biz_domain=task.biz_domain, + biz_obj=task.biz_obj, + operation=task.operation, + ) + if handler is None: + raise BizError("导入导出处理器未注册", code=404) + result = handler( + ExchangeTaskContext( + task_id=task.id, + plan=plan, + snapshot=snapshot, + storage_key=task.storage_key, + payload=task.input_payload, + requested_by=task.requested_by, + ) + ) + if isinstance(result, dict): + result = ExchangeTaskResult(**result) + return self.mark_task_finished( + task_id, + status="success", + message=result.message, + result_payload=result.result_payload, + success_count=result.success_count, + failed_count=result.failed_count, + ) + except Exception as exc: + return self.mark_task_finished( + task_id, + status="failed", + message=str(exc), + ) + def get_snapshot(self, *, template_id: str, version: str) -> ExchangeTemplateSnapshot | None: version_model = self.db.scalar( select(ExchangeTemplateVersionModel) @@ -251,7 +391,9 @@ class ExchangeService: return None return self.snapshot_from_model(version_model) - def get_snapshot_by_version_id(self, version_id: str) -> ExchangeTemplateSnapshot | None: + def get_snapshot_by_version_id(self, version_id: str | None) -> ExchangeTemplateSnapshot | None: + if version_id is None: + return None version_model = self.db.get(ExchangeTemplateVersionModel, version_id) if version_model is None: return None @@ -266,57 +408,77 @@ class ExchangeService: def resolve_plan( self, *, - template_kind: ExchangeTemplateKind | str, + biz_domain: str, + biz_obj: str, + operation: ExchangeOperation | str, template_id: str | None = None, version_id: str | None = None, version: str | None = None, - bindings: list[ExchangeTemplateBinding] | None = None, - fields: list[ExchangeField] | None = None, - placeholders: list[ExchangePlaceholder] | None = None, - title: str | None = None, + code: str | None = None, + name: str | None = None, description: str | None = None, - sheet_name: str | None = None, - meta: dict[str, Any] | None = None, + layout: ExchangeTemplateLayout | dict[str, Any] | None = None, + variables: list[ExchangeVariable] | None = None, source: Any | None = None, - ) -> ExchangePlan: + ) -> ExchangeTemplatePlan: if source is not None: return source.resolve_plan( - template_kind=template_kind, + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, template_id=template_id, version_id=version_id, version=version, - bindings=bindings, - fields=fields, - placeholders=placeholders, - title=title, + code=code, + name=name, description=description, - sheet_name=sheet_name, - meta=meta, + layout=layout, + variables=variables, ) if version_id: snapshot = self.get_snapshot_by_version_id(version_id) if snapshot is not None: - return snapshot.to_plan() + return snapshot if template_id and version: snapshot = self.get_snapshot(template_id=template_id, version=version) if snapshot is not None: - return snapshot.to_plan() + return snapshot if template_id: current = self.get_current_snapshot(template_id) if current is not None: - return current.to_plan() - return ExchangePlan.from_mapping( - template_kind=template_kind, - template_id=template_id, - version_id=version_id, - version=version, - bindings=bindings, - fields=fields, - placeholders=placeholders, - title=title, + return current + template = self.get_template_or_404(template_id) + return self.plan_from_template_model(template) + + template = self.get_template_by_scope( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, + code=code, + ) + if template is not None: + current = self.get_current_snapshot(template.id) + if current is not None: + return current + return self.plan_from_template_model(template) + + spec = get_exchange_registry(self.app).get_spec( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=_operation_value(operation), + ) + if spec is not None: + return spec.to_plan() + + return ExchangeTemplatePlan.from_mapping( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, + code=code, + name=name, description=description, - sheet_name=sheet_name, - meta=meta, + layout=layout, + variables=variables, ) def mark_task_running(self, task_id: str) -> ExchangeTaskModel: @@ -381,6 +543,29 @@ class ExchangeService: raise BizError("模板不存在", code=404) return template + def get_template_by_scope( + self, + *, + biz_domain: str, + biz_obj: str, + operation: ExchangeOperation | str, + code: str | None = None, + ) -> ExchangeTemplateModel | None: + statement = ( + select(ExchangeTemplateModel) + .where(ExchangeTemplateModel.biz_domain == biz_domain) + .where(ExchangeTemplateModel.biz_obj == biz_obj) + .where(ExchangeTemplateModel.operation == _operation_value(operation)) + ) + if code is not None: + statement = statement.where(ExchangeTemplateModel.code == code) + return self.db.scalar( + statement.order_by( + ExchangeTemplateModel.created_at.desc(), + ExchangeTemplateModel.updated_at.desc(), + ) + ) + def get_version_or_404(self, version_id: str) -> ExchangeTemplateVersionModel: version = self.db.get(ExchangeTemplateVersionModel, version_id) if version is None: @@ -393,11 +578,26 @@ class ExchangeService: raise BizError("导入导出任务不存在", code=404) return task - def list_templates(self) -> list[ExchangeTemplateModel]: + def list_templates( + self, + *, + biz_domain: str | None = None, + biz_obj: str | None = None, + operation: str | None = None, + ) -> list[ExchangeTemplateModel]: + statement = select(ExchangeTemplateModel) + if biz_domain is not None: + statement = statement.where(ExchangeTemplateModel.biz_domain == biz_domain) + if biz_obj is not None: + statement = statement.where(ExchangeTemplateModel.biz_obj == biz_obj) + if operation is not None: + statement = statement.where(ExchangeTemplateModel.operation == operation) return list( self.db.scalars( - select(ExchangeTemplateModel).order_by( - ExchangeTemplateModel.entity, + statement.order_by( + ExchangeTemplateModel.biz_domain, + ExchangeTemplateModel.biz_obj, + ExchangeTemplateModel.operation, ExchangeTemplateModel.code, ) ) @@ -408,7 +608,10 @@ class ExchangeService: self.db.scalars( select(ExchangeTemplateVersionModel) .where(ExchangeTemplateVersionModel.template_id == template_id) - .order_by(ExchangeTemplateVersionModel.version) + .order_by( + ExchangeTemplateVersionModel.created_at.desc(), + ExchangeTemplateVersionModel.updated_at.desc(), + ) ) ) @@ -427,65 +630,78 @@ class ExchangeService: ) ) + def plan_from_template_model(self, template: ExchangeTemplateModel) -> ExchangeTemplatePlan: + spec = get_exchange_registry(self.app).get_spec( + biz_domain=template.biz_domain, + biz_obj=template.biz_obj, + operation=template.operation, + ) + variables = tuple(spec.variables) if spec is not None else () + return ExchangeTemplatePlan( + scope=ExchangeScope.from_mapping( + biz_domain=template.biz_domain, + biz_obj=template.biz_obj, + operation=template.operation, + ), + code=template.code, + name=template.name, + description=template.description, + template_id=template.id, + version=template.current_version, + layout=_coerce_layout(template.layout), + variables=variables, + ) + def snapshot_from_model( self, version: ExchangeTemplateVersionModel ) -> ExchangeTemplateSnapshot: + template = self.get_template_or_404(version.template_id) return ExchangeTemplateSnapshot( - id=version.id, - version=version.version, + scope=ExchangeScope.from_mapping( + biz_domain=version.biz_domain, + biz_obj=version.biz_obj, + operation=version.operation, + ), + code=template.code, + name=template.name, + description=template.description, template_id=version.template_id, - template_kind=ExchangeTemplateKind(version.template_kind), - bindings=tuple(_binding_from_dict(item) for item in version.bindings), + version_id=version.id, + version=version.version, + layout=_coerce_layout(version.layout), + variables=tuple(_variable_from_dict(item) for item in version.variables), published_at=version.published_at.isoformat() if version.published_at else None, file_key=version.file_key, checksum=version.checksum, - fields=tuple(_field_from_dict(item) for item in version.fields), - placeholders=tuple(_placeholder_from_dict(item) for item in version.placeholders), - meta=version.meta, ) -def _field_from_dict(value: dict[str, Any]) -> ExchangeField: - options = value.get("options") or () - return ExchangeField( - key=value["key"], - label=value["label"], - placeholder=value.get("placeholder"), - required=bool(value.get("required", False)), - example=value.get("example"), - width=value.get("width"), - format=value.get("format"), - source=value.get("source"), - target=value.get("target"), - options=tuple(tuple(item) for item in options), - meta=value.get("meta") or {}, - ) - - -def _placeholder_from_dict(value: dict[str, Any]) -> ExchangePlaceholder: - return ExchangePlaceholder( +def _variable_from_dict(value: dict[str, Any]) -> ExchangeVariable: + return ExchangeVariable( key=value["key"], label=value["label"], + header=value.get("header"), description=value.get("description"), required=bool(value.get("required", False)), example=value.get("example"), ) -def _binding_from_dict(value: dict[str, Any]) -> ExchangeTemplateBinding: - return ExchangeTemplateBinding( - entity=value["entity"], - template_kind=ExchangeTemplateKind(value["template_kind"]), - handler=value.get("handler"), - description=value.get("description"), - default_sheet_name=value.get("default_sheet_name"), - default_file_name=value.get("default_file_name"), +def _coerce_layout(value: ExchangeTemplateLayout | dict[str, Any] | None) -> ExchangeTemplateLayout: + if value is None: + return ExchangeTemplateLayout() + if isinstance(value, ExchangeTemplateLayout): + return value + return ExchangeTemplateLayout( title=value.get("title"), - meta=value.get("meta") or {}, + sheet_name=value.get("sheet_name"), + title_row=value.get("title_row", 1), + header_row=int(value.get("header_row") or 2), + data_start_row=value.get("data_start_row"), ) -def _enum_value(value: Any) -> str: +def _operation_value(value: ExchangeOperation | str) -> str: return value.value if hasattr(value, "value") else str(value) diff --git a/iti/exchange/sources.py b/iti/exchange/sources.py index ed6e5c0..a966bf4 100644 --- a/iti/exchange/sources.py +++ b/iti/exchange/sources.py @@ -6,13 +6,11 @@ from typing import Any, Protocol from iti.service_client import ServiceClient, service_client from .base import ( - ExchangeField, - ExchangePlaceholder, - ExchangePlan, - ExchangeTemplateBinding, - ExchangeTemplateKind, - ExchangeTemplateSource, + ExchangeOperation, + ExchangeTemplateLayout, + ExchangeTemplatePlan, ExchangeTemplateSourceKind, + ExchangeVariable, ) @@ -20,22 +18,21 @@ class ExchangeSource(Protocol): def resolve_plan( self, *, - template_kind: ExchangeTemplateKind | str, + biz_domain: str, + biz_obj: str, + operation: ExchangeOperation | str, template_id: str | None = None, version_id: str | None = None, version: str | None = None, - bindings: list[ExchangeTemplateBinding] | None = None, - fields: list[ExchangeField] | None = None, - placeholders: list[ExchangePlaceholder] | None = None, - title: str | None = None, + code: str | None = None, + name: str | None = None, description: str | None = None, - sheet_name: str | None = None, - meta: dict[str, Any] | None = None, - source: ExchangeTemplateSource | None = None, - ) -> ExchangePlan: + layout: ExchangeTemplateLayout | dict[str, Any] | None = None, + variables: list[ExchangeVariable] | None = None, + ) -> ExchangeTemplatePlan: ... - def load_template_file(self, plan: ExchangePlan) -> bytes | None: + def load_template_file(self, plan: ExchangeTemplatePlan) -> bytes | None: ... @@ -44,36 +41,33 @@ class MappingExchangeSource: def resolve_plan( self, *, - template_kind: ExchangeTemplateKind | str, + biz_domain: str, + biz_obj: str, + operation: ExchangeOperation | str, template_id: str | None = None, version_id: str | None = None, version: str | None = None, - bindings: list[ExchangeTemplateBinding] | None = None, - fields: list[ExchangeField] | None = None, - placeholders: list[ExchangePlaceholder] | None = None, - title: str | None = None, + code: str | None = None, + name: str | None = None, description: str | None = None, - sheet_name: str | None = None, - meta: dict[str, Any] | None = None, - source: ExchangeTemplateSource | None = None, - ) -> ExchangePlan: - if source is not None: - return source.to_plan() - return ExchangePlan.from_mapping( - template_kind=template_kind, + layout: ExchangeTemplateLayout | dict[str, Any] | None = None, + variables: list[ExchangeVariable] | None = None, + ) -> ExchangeTemplatePlan: + return ExchangeTemplatePlan.from_mapping( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, template_id=template_id, version_id=version_id, version=version, - bindings=bindings, - fields=fields, - placeholders=placeholders, - title=title, + code=code, + name=name, description=description, - sheet_name=sheet_name, - meta=meta, + layout=layout, + variables=variables, ) - def load_template_file(self, plan: ExchangePlan) -> bytes | None: + def load_template_file(self, plan: ExchangeTemplatePlan) -> bytes | None: return _excel_template_codec().dump(plan) @@ -85,47 +79,35 @@ class LocalExchangeSource: def resolve_plan( self, *, - template_kind: ExchangeTemplateKind | str, + biz_domain: str, + biz_obj: str, + operation: ExchangeOperation | str, template_id: str | None = None, version_id: str | None = None, version: str | None = None, - bindings: list[ExchangeTemplateBinding] | None = None, - fields: list[ExchangeField] | None = None, - placeholders: list[ExchangePlaceholder] | None = None, - title: str | None = None, + code: str | None = None, + name: str | None = None, description: str | None = None, - sheet_name: str | None = None, - meta: dict[str, Any] | None = None, - source: ExchangeTemplateSource | None = None, - ) -> ExchangePlan: + layout: ExchangeTemplateLayout | dict[str, Any] | None = None, + variables: list[ExchangeVariable] | None = None, + ) -> ExchangeTemplatePlan: from .service import ExchangeService - service = ExchangeService(self.app, self.db) - if source is not None: - return source.to_plan() - if version_id: - snapshot = service.get_snapshot_by_version_id(version_id) - if snapshot is not None: - return snapshot.to_plan() - if template_id: - snapshot = service.get_current_snapshot(template_id) - if snapshot is not None: - return snapshot.to_plan() - return ExchangePlan.from_mapping( - template_kind=template_kind, + return ExchangeService(self.app, self.db).resolve_plan( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, template_id=template_id, version_id=version_id, version=version, - bindings=bindings, - fields=fields, - placeholders=placeholders, - title=title, + code=code, + name=name, description=description, - sheet_name=sheet_name, - meta=meta, + layout=layout, + variables=variables, ) - def load_template_file(self, plan: ExchangePlan) -> bytes | None: + def load_template_file(self, plan: ExchangeTemplatePlan) -> bytes | None: if plan.version_id: from .service import ExchangeService @@ -141,55 +123,46 @@ class RemoteExchangeSource: def resolve_plan( self, *, - template_kind: ExchangeTemplateKind | str, + biz_domain: str, + biz_obj: str, + operation: ExchangeOperation | str, template_id: str | None = None, version_id: str | None = None, version: str | None = None, - bindings: list[ExchangeTemplateBinding] | None = None, - fields: list[ExchangeField] | None = None, - placeholders: list[ExchangePlaceholder] | None = None, - title: str | None = None, + code: str | None = None, + name: str | None = None, description: str | None = None, - sheet_name: str | None = None, - meta: dict[str, Any] | None = None, - source: ExchangeTemplateSource | None = None, - ) -> ExchangePlan: - if source is not None: - return source.to_plan() + layout: ExchangeTemplateLayout | dict[str, Any] | None = None, + variables: list[ExchangeVariable] | None = None, + ) -> ExchangeTemplatePlan: client = service_client(self.app, self.service_name) - payload = self._fetch_plan(client, template_id=template_id, version=version, version_id=version_id) + payload = self._fetch_plan( + client, + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, + template_id=template_id, + version=version, + version_id=version_id, + code=code, + ) if payload is not None: - meta = payload.get("meta") or {} - return ExchangePlan.from_mapping( - template_kind=payload.get("template_kind") or template_kind, - template_id=payload.get("template_id") or template_id, - version_id=payload.get("id") or version_id, - version=payload.get("version") or version, - bindings=[_binding_from_mapping(item) for item in payload.get("bindings", [])], - fields=[_field_from_mapping(item) for item in payload.get("fields", [])], - placeholders=[ - _placeholder_from_mapping(item) for item in payload.get("placeholders", []) - ], - title=payload.get("title") or meta.get("title") or title, - description=payload.get("description") or meta.get("description") or description, - sheet_name=payload.get("sheet_name") or meta.get("sheet_name") or sheet_name, - meta=meta or payload.get("meta") or {}, - ) - return ExchangePlan.from_mapping( - template_kind=template_kind, + return _plan_from_mapping(payload) + return ExchangeTemplatePlan.from_mapping( + biz_domain=biz_domain, + biz_obj=biz_obj, + operation=operation, template_id=template_id, version_id=version_id, version=version, - bindings=bindings, - fields=fields, - placeholders=placeholders, - title=title, + code=code, + name=name, description=description, - sheet_name=sheet_name, - meta=meta, + layout=layout, + variables=variables, ) - def load_template_file(self, plan: ExchangePlan) -> bytes | None: + def load_template_file(self, plan: ExchangeTemplatePlan) -> bytes | None: if not plan.version_id: return _excel_template_codec().dump(plan) client = service_client(self.app, self.service_name) @@ -203,61 +176,71 @@ class RemoteExchangeSource: self, client: ServiceClient, *, + biz_domain: str, + biz_obj: str, + operation: ExchangeOperation | str, template_id: str | None, version: str | None, version_id: str | None, + code: str | None, ) -> dict[str, Any] | None: if version_id: return client.get(f"/exchange/template-versions/{version_id}") if template_id and version: - return client.get(f"/exchange/templates/{template_id}/versions/{version}") + return client.get(f"/exchange/templates/{template_id}/versions/by-version/{version}") if template_id: - template = client.get(f"/exchange/templates/{template_id}") - current_version = (template or {}).get("current_version") - if current_version: - return client.get(f"/exchange/templates/{template_id}/versions/{current_version}") - return None - - -def _binding_from_mapping(item: dict[str, Any]) -> ExchangeTemplateBinding: - return ExchangeTemplateBinding( - entity=item.get("entity"), - template_kind=item.get("template_kind") or item.get("templateKind"), - handler=item.get("handler"), - description=item.get("description"), - default_sheet_name=item.get("default_sheet_name") or item.get("defaultSheetName"), - default_file_name=item.get("default_file_name") or item.get("defaultFileName"), - title=item.get("title"), - meta=item.get("meta") or {}, - ) + return client.post( + "/exchange/plans/resolve", + json={ + "bizDomain": biz_domain, + "bizObj": biz_obj, + "operation": _operation_value(operation), + "templateId": template_id, + }, + ) + return client.post( + "/exchange/plans/resolve", + json={ + "bizDomain": biz_domain, + "bizObj": biz_obj, + "operation": _operation_value(operation), + "code": code, + }, + ) -def _field_from_mapping(item: dict[str, Any]) -> ExchangeField: - return ExchangeField( - key=item.get("key"), - label=item.get("label"), - placeholder=item.get("placeholder"), - required=bool(item.get("required", False)), - example=item.get("example"), - width=item.get("width"), - format=item.get("format"), - source=item.get("source"), - target=item.get("target"), - options=tuple(tuple(option) for option in item.get("options") or []), - meta=item.get("meta") or {}, +def _plan_from_mapping(item: dict[str, Any]) -> ExchangeTemplatePlan: + scope = item.get("scope") or {} + return ExchangeTemplatePlan.from_mapping( + biz_domain=item.get("biz_domain") or item.get("bizDomain") or scope.get("biz_domain") or scope.get("bizDomain"), + biz_obj=item.get("biz_obj") or item.get("bizObj") or scope.get("biz_obj") or scope.get("bizObj"), + operation=item.get("operation") or scope.get("operation"), + template_id=item.get("template_id") or item.get("templateId"), + version_id=item.get("version_id") or item.get("versionId") or item.get("id"), + version=item.get("version"), + code=item.get("code"), + name=item.get("name"), + description=item.get("description"), + layout=item.get("layout"), + variables=[_variable_from_mapping(value) for value in item.get("variables", [])], ) -def _placeholder_from_mapping(item: dict[str, Any]) -> ExchangePlaceholder: - return ExchangePlaceholder( +def _variable_from_mapping(item: dict[str, Any]) -> ExchangeVariable: + return ExchangeVariable( key=item.get("key"), - label=item.get("label"), + label=item.get("label") or item.get("key"), + header=item.get("header"), description=item.get("description"), required=bool(item.get("required", False)), example=item.get("example"), ) +def _operation_value(value: ExchangeOperation | str) -> str: + return value.value if hasattr(value, "value") else str(value) + + def _excel_template_codec(): from .excel import ExcelTemplateCodec diff --git a/iti/exchange/tasks.py b/iti/exchange/tasks.py index 03f83c1..4ff85e1 100644 --- a/iti/exchange/tasks.py +++ b/iti/exchange/tasks.py @@ -1,29 +1,9 @@ from __future__ import annotations -from dataclasses import dataclass -from typing import Any - from iti.storage import StorageManager from iti.tasks import task_registry -from .base import ExchangeTaskKind - - -@dataclass(frozen=True) -class ExchangeTaskContext: - task_kind: ExchangeTaskKind - template_id: str - version: str - storage_key: str | None = None - payload: dict[str, Any] | None = None - - -@dataclass(frozen=True) -class ExchangeTaskResult: - success_count: int = 0 - failed_count: int = 0 - message: str | None = None - result_payload: dict[str, Any] | None = None +from .base import ExchangeTaskContext def register_exchange_task( diff --git a/tests/test_exchange.py b/tests/test_exchange.py index 3f89183..ed3685a 100644 --- a/tests/test_exchange.py +++ b/tests/test_exchange.py @@ -1,6 +1,5 @@ from __future__ import annotations -from io import BytesIO from dataclasses import dataclass import httpx @@ -10,22 +9,22 @@ from iti import create_app from iti.config import BaseConfig from iti.db import Base, reset_db from iti.exchange import ( - ExchangeField, - ExchangePlaceholder, - ExchangePlan, - ExchangeTemplateBinding, - ExchangeTemplateKind, - ExchangeTemplateSource, - ExchangeTemplateSourceKind, - LocalExchangeSource, + ExchangeBusinessSpec, + ExchangeOperation, + ExchangeScope, + ExchangeTaskContext, + ExchangeTaskResult, + ExchangeTemplateLayout, + ExchangeTemplatePlan, + ExchangeVariable, MappingExchangeSource, RemoteExchangeSource, get_exchange_registry, register_exchange_source, + register_exchange_spec, ) from iti.exchange.excel import ExcelTemplateCodec, ExcelWorkbookCodec from iti.exchange.service import ExchangeService -from iti.exchange.base import ExchangeTemplateSnapshot from iti.service_client import register_service_client @@ -40,6 +39,19 @@ def make_app(*, exchange_enabled: bool = True): return app +def user_import_spec() -> ExchangeBusinessSpec: + return ExchangeBusinessSpec( + scope=ExchangeScope("system", "user", ExchangeOperation.IMPORT), + name="用户导入", + description="导入系统用户", + layout=ExchangeTemplateLayout(title="用户导入", sheet_name="用户", header_row=2), + variables=( + ExchangeVariable(key="username", label="用户名", required=True, example="alice"), + ExchangeVariable(key="mobile", label="手机号"), + ), + ) + + def test_exchange_module_is_auto_registered(): app = make_app() @@ -47,133 +59,105 @@ def test_exchange_module_is_auto_registered(): assert "exchange:template:list" in app.state.iti_modules.permissions -def test_template_version_workbook_roundtrip(): - snapshot = ExchangeTemplateSnapshot( - id="v1", - version="1.0.0", - template_id="tpl1", - template_kind="import", - bindings=( - ExchangeTemplateBinding(entity="order", template_kind="import", title="订单"), - ), - fields=( - ExchangeField(key="name", label="名称", placeholder="{{name}}", source="name"), - ), - placeholders=( - ExchangePlaceholder(key="tenant", label="租户", example="demo"), - ), - meta={"title": "订单模板", "sheet_name": "模板"}, - ) +def test_business_spec_registry_builds_catalog_and_handler(): + app = make_app() + + def handler(context: ExchangeTaskContext) -> ExchangeTaskResult: + return ExchangeTaskResult(success_count=1, result_payload={"task": context.task_id}) + + spec = register_exchange_spec(app, user_import_spec(), handler=handler) + registry = get_exchange_registry(app) + + assert spec.generated_code() == "system.user.import" + assert registry.get_scope_handler( + biz_domain="system", + biz_obj="user", + operation="import", + ) is handler + assert registry.catalog()[0]["objects"][0]["operations"][0]["variables"][0]["key"] == "username" + + +def test_template_workbook_roundtrip_uses_business_variables(): + plan = user_import_spec().to_plan(template_id="tpl1", version_id="v1", version="1.0.0") codec = ExcelTemplateCodec() - content = codec.dump(snapshot) + + content = codec.dump(plan) parsed = codec.load(content) - assert parsed["title"] == "订单模板" - assert parsed["sheet_name"] == "模板" - assert parsed["bindings"][0]["entity"] == "order" - assert parsed["fields"][0]["key"] == "name" - assert parsed["placeholders"][0]["key"] == "tenant" + assert parsed["title"] == "用户导入" + assert parsed["sheet_name"] == "用户" + assert parsed["variables"][0]["key"] == "username" + assert parsed["variables"][0]["label"] == "用户名" -def test_exchange_service_create_publish_and_task_flow(): +def test_service_create_publish_resolve_and_task_flow(): reset_db() app = make_app() + register_exchange_spec(app, user_import_spec()) service = ExchangeService(app, app.state.db_sessionmaker()) template = service.create_template( - code="order", - name="订单", - template_kind="import", - entity="order", + name="用户导入模板", + biz_domain="system", + biz_obj="user", + operation="import", ) - assert template.code == "order" + assert template.code == "system.user.import" + + version = service.publish_version(template_id=template.id, version="1.0.0") + assert version.variables[0]["key"] == "username" - version = service.publish_version( + plan = service.resolve_plan( + biz_domain="system", + biz_obj="user", + operation="import", template_id=template.id, - version="1.0.0", - bindings=[ - ExchangeTemplateBinding(entity="order", template_kind="import", title="订单") - ], - fields=[ExchangeField(key="name", label="名称", source="name")], - placeholders=[ExchangePlaceholder(key="tenant", label="租户")], ) - assert version.version == "1.0.0" + assert plan.version_id == version.id + assert plan.variables[0].key == "username" task = service.create_task( + biz_domain="system", + biz_obj="user", + operation="import", template_id=template.id, version_id=version.id, - task_kind="import", input_payload={"source": "upload"}, ) - assert task.task_kind == "import" + assert task.operation == "import" assert task.template_version_id == version.id - workbook = ExcelWorkbookCodec().export_rows_with_template( - fields=[ExchangeField(key="name", label="名称", source="name")], - rows=[{"name": "A"}], - sheet_name="导出", - ) - rows = ExcelWorkbookCodec().import_rows(workbook) - assert rows[0]["名称"] == "A" - -def test_mapping_source_supports_template_less_workbook_roundtrip(): - source = MappingExchangeSource() - plan = source.resolve_plan( - template_kind="import", - fields=[ - ExchangeField(key="name", label="名称", placeholder="{{name}}", source="name"), - ExchangeField(key="age", label="年龄", source="age"), - ], - placeholders=[ExchangePlaceholder(key="tenant", label="租户")], - title="映射模板", - sheet_name="映射", - ) - - codec = ExcelTemplateCodec() - workbook = codec.dump(plan) - parsed = codec.load(workbook) - - assert parsed["title"] == "映射模板" - assert parsed["sheet_name"] == "映射" - assert parsed["fields"][0]["source"] == "name" - rows = ExcelWorkbookCodec().export_rows_with_plan( - plan=plan, - rows=[{"name": "Alice", "age": 18}], - ) - imported = ExcelWorkbookCodec().import_rows_with_fields(rows, fields=list(plan.fields)) - assert imported[0]["name"] == "Alice" - assert imported[0]["age"] == 18 - - -def test_excel_workbook_codec_roundtrip_uses_pandas_and_preserves_empty_cells(): +def test_excel_workbook_maps_headers_to_business_variables(): + variables = [ + ExchangeVariable(key="username", label="用户名"), + ExchangeVariable(key="mobile", label="手机号"), + ] codec = ExcelWorkbookCodec() - content = codec.export_rows( - headers=["名称", "年龄"], - rows=[{"名称": "Alice", "年龄": None}], - sheet_name="导出", + content = codec.export_rows_with_variables( + variables=variables, + rows=[{"username": "alice", "mobile": None}], + sheet_name="用户", ) - rows = codec.import_rows(content) + rows = codec.import_rows_with_variables(content, variables=variables) - assert rows == [{"名称": "Alice", "年龄": None}] + assert rows == [{"username": "alice", "mobile": None}] -def test_excel_workbook_codec_with_fields_maps_headers_to_target_keys(): - codec = ExcelWorkbookCodec() - fields = [ - ExchangeField(key="name", label="名称", source="name"), - ExchangeField(key="age", label="年龄", target="age_import"), - ] - content = codec.export_rows_with_template( - fields=fields, - rows=[{"name": "Alice", "age": 18}], - sheet_name="映射", +def test_mapping_source_supports_template_less_plan(): + source = MappingExchangeSource() + plan = source.resolve_plan( + biz_domain="system", + biz_obj="user", + operation="import", + variables=[ExchangeVariable(key="username", label="用户名")], + layout={"title": "用户导入", "sheet_name": "用户"}, ) - rows = codec.import_rows_with_fields(content, fields=fields) - - assert rows == [{"name": "Alice", "age_import": 18}] + assert plan.generated_code() == "system.user.import" + assert plan.layout.sheet_name == "用户" + assert plan.variables[0].key == "username" def test_remote_source_resolves_plan_and_template_bytes(): @@ -185,20 +169,18 @@ def test_remote_source_resolves_plan_and_template_bytes(): 200, json={ "data": { - "id": "v1", - "template_id": "tpl1", + "scope": { + "bizDomain": "system", + "bizObj": "user", + "operation": "import", + }, + "templateId": "tpl1", + "versionId": "v1", "version": "1.0.0", - "template_kind": "import", - "bindings": [ - {"entity": "order", "template_kind": "import", "title": "订单"} - ], - "fields": [ - {"key": "name", "label": "名称", "source": "name"} - ], - "placeholders": [ - {"key": "tenant", "label": "租户"} - ], - "meta": {"title": "订单模板", "sheet_name": "模板"}, + "code": "system.user.import", + "name": "用户导入", + "layout": {"title": "用户导入", "sheetName": "用户"}, + "variables": [{"key": "username", "label": "用户名"}], }, "code": 200, "message": "成功", @@ -206,14 +188,12 @@ def test_remote_source_resolves_plan_and_template_bytes(): ) if request.url.path == "/exchange/template-versions/v1/download": content = ExcelTemplateCodec().dump( - ExchangePlan.from_mapping( - template_kind=ExchangeTemplateKind.IMPORT, - template_id="tpl1", + ExchangeTemplatePlan.from_mapping( + biz_domain="system", + biz_obj="user", + operation="import", version_id="v1", - version="1.0.0", - fields=[ExchangeField(key="name", label="名称", source="name")], - title="订单模板", - sheet_name="模板", + variables=[ExchangeVariable(key="username", label="用户名")], ) ) return httpx.Response( @@ -233,10 +213,15 @@ def test_remote_source_resolves_plan_and_template_bytes(): ) source = RemoteExchangeSource(app, service_name="template_center") - plan = source.resolve_plan(template_kind="import", version_id="v1") + plan = source.resolve_plan( + biz_domain="system", + biz_obj="user", + operation="import", + version_id="v1", + ) assert plan.template_id == "tpl1" - assert plan.title == "订单模板" - assert plan.fields[0].source == "name" + assert plan.name == "用户导入" + assert plan.variables[0].key == "username" content = source.load_template_file(plan) assert content is not None @@ -246,23 +231,24 @@ def test_remote_source_resolves_plan_and_template_bytes(): def test_custom_registered_source_can_drive_plan_and_file(): @dataclass class CustomSource: - plan: ExchangePlan + plan: ExchangeTemplatePlan content: bytes def resolve_plan(self, **kwargs): return self.plan - def load_template_file(self, plan: ExchangePlan) -> bytes | None: + def load_template_file(self, plan: ExchangeTemplatePlan) -> bytes | None: return self.content app = make_app() - custom_plan = ExchangePlan.from_mapping( - template_kind="import", + custom_plan = ExchangeTemplatePlan.from_mapping( + biz_domain="system", + biz_obj="user", + operation="import", template_id="tpl-custom", version_id="v-custom", version="1.0.0", - fields=[ExchangeField(key="name", label="名称", source="name")], - title="自定义模板", + variables=[ExchangeVariable(key="username", label="用户名")], ) register_exchange_source( app, @@ -270,13 +256,13 @@ def test_custom_registered_source_can_drive_plan_and_file(): CustomSource(plan=custom_plan, content=b"custom-template"), ) - assert get_exchange_registry(app).get_source("custom-center") is not None - client = TestClient(app) response = client.post( "/exchange/plans/resolve", json={ - "templateKind": "import", + "bizDomain": "system", + "bizObj": "user", + "operation": "import", "sourceName": "custom-center", }, ) @@ -286,7 +272,9 @@ def test_custom_registered_source_can_drive_plan_and_file(): file_resp = client.post( "/exchange/plans/template-file", json={ - "templateKind": "import", + "bizDomain": "system", + "bizObj": "user", + "operation": "import", "sourceName": "custom-center", }, ) @@ -294,17 +282,29 @@ def test_custom_registered_source_can_drive_plan_and_file(): assert file_resp.content == b"custom-template" -def test_exchange_routes_are_available(): +def test_exchange_routes_support_template_and_catalog_flow(): app = make_app() + register_exchange_spec(app, user_import_spec()) client = TestClient(app) + catalog = client.get("/exchange/catalog") + assert catalog.status_code == 200 + assert catalog.json()["data"][0]["biz_domain"] == "system" + + code = client.get( + "/exchange/templates/code", + params={"biz_domain": "system", "biz_obj": "user", "operation": "import"}, + ) + assert code.status_code == 200 + assert code.json()["data"]["code"] == "system.user.import" + created = client.post( "/exchange/templates", json={ - "code": "order", - "name": "订单", - "template_kind": "import", - "entity": "order", + "name": "用户导入模板", + "bizDomain": "system", + "bizObj": "user", + "operation": "import", }, ) assert created.status_code == 200 @@ -312,49 +312,51 @@ def test_exchange_routes_are_available(): version = client.post( f"/exchange/templates/{template_id}/versions", - json={ - "version": "1.0.0", - "bindings": [], - "fields": [], - "placeholders": [], - }, + json={"version": "1.0.0"}, ) assert version.status_code == 200 + assert version.json()["data"]["variables"][0]["key"] == "username" - listed = client.get("/exchange/templates") - assert listed.status_code == 200 - assert listed.json()["data"][0]["code"] == "order" - - -def test_exchange_plan_routes_support_mapping_source(): - app = make_app() - client = TestClient(app) - - response = client.post( + plan = client.post( "/exchange/plans/resolve", json={ - "taskKind": "import", - "sourceKind": "mapping", - "fields": [ - {"key": "name", "label": "名称", "source": "name"}, - ], - "title": "映射", - "sheetName": "映射", + "bizDomain": "system", + "bizObj": "user", + "operation": "import", + "templateId": template_id, }, ) - assert response.status_code == 200 - assert response.json()["data"]["title"] == "映射" + assert plan.status_code == 200 + assert plan.json()["data"]["version_id"] == version.json()["data"]["id"] - file_resp = client.post( - "/exchange/plans/template-file", - json={ - "taskKind": "import", - "sourceKind": "mapping", - "fields": [ - {"key": "name", "label": "名称", "source": "name"}, - ], - "title": "映射", - "sheetName": "映射", - }, + +def test_task_run_uses_registered_business_handler(): + reset_db() + app = make_app() + + def handler(context: ExchangeTaskContext) -> ExchangeTaskResult: + assert context.plan.scope.biz_domain == "system" + return ExchangeTaskResult(success_count=2, result_payload={"handled": True}) + + register_exchange_spec(app, user_import_spec(), handler=handler) + service = ExchangeService(app, app.state.db_sessionmaker()) + template = service.create_template( + name="用户导入模板", + biz_domain="system", + biz_obj="user", + operation="import", ) - assert file_resp.status_code == 200 + version = service.publish_version(template_id=template.id, version="1.0.0") + task = service.create_task( + biz_domain="system", + biz_obj="user", + operation="import", + template_id=template.id, + version_id=version.id, + ) + + finished = service.run_task(task.id) + + assert finished.status == "success" + assert finished.success_count == 2 + assert finished.result_payload == {"handled": True}