From 1525f3503699ac95cdb0480167304674d88529a1 Mon Sep 17 00:00:00 2001 From: NoahLan <6995syu@163.com> Date: Sat, 16 May 2026 00:39:05 +0800 Subject: [PATCH] =?UTF-8?q?chore:=20excel=E5=AF=BC=E5=85=A5=E5=AF=BC?= =?UTF-8?q?=E5=87=BA=E6=A8=A1=E6=9D=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 1 + copier-template/migrations/env.py | 1 + docs/CONFIGURATION.md | 3 + docs/EXCHANGE.md | 67 ++++ docs/MODULES.md | 12 + docs/README.md | 1 + iti/app.py | 13 +- iti/config.py | 3 + iti/exchange/__init__.py | 53 +++ iti/exchange/base.py | 247 ++++++++++++++ iti/exchange/excel.py | 362 +++++++++++++++++++++ iti/exchange/models.py | 129 ++++++++ iti/exchange/module.py | 30 ++ iti/exchange/plan.py | 43 +++ iti/exchange/registry.py | 91 ++++++ iti/exchange/routes.py | 457 ++++++++++++++++++++++++++ iti/exchange/schemas.py | 152 +++++++++ iti/exchange/service.py | 513 ++++++++++++++++++++++++++++++ iti/exchange/sources.py | 286 +++++++++++++++++ iti/exchange/tasks.py | 59 ++++ migrations/env.py | 1 + tests/test_exchange.py | 360 +++++++++++++++++++++ 22 files changed, 2883 insertions(+), 1 deletion(-) create mode 100644 docs/EXCHANGE.md create mode 100644 iti/exchange/__init__.py create mode 100644 iti/exchange/base.py create mode 100644 iti/exchange/excel.py create mode 100644 iti/exchange/models.py create mode 100644 iti/exchange/module.py create mode 100644 iti/exchange/plan.py create mode 100644 iti/exchange/registry.py create mode 100644 iti/exchange/routes.py create mode 100644 iti/exchange/schemas.py create mode 100644 iti/exchange/service.py create mode 100644 iti/exchange/sources.py create mode 100644 iti/exchange/tasks.py create mode 100644 tests/test_exchange.py diff --git a/README.md b/README.md index 4b7ae8f..a97d287 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,7 @@ cd ../my-system-app - [架构](docs/ARCHITECTURE.md) - [配置](docs/CONFIGURATION.md) - [模块协议](docs/MODULES.md) +- [模板与导入导出](docs/EXCHANGE.md) - [服务客户端](docs/SERVICE_CLIENT.md) - [任务运行器](docs/TASKS.md) - [数据库迁移](docs/MIGRATIONS.md) diff --git a/copier-template/migrations/env.py b/copier-template/migrations/env.py index 78585c7..089bc4b 100644 --- a/copier-template/migrations/env.py +++ b/copier-template/migrations/env.py @@ -8,6 +8,7 @@ from sqlalchemy import engine_from_config, pool from config import config as app_config from iti.db import Base +from iti.exchange import models as _exchange_models from {{ project_slug }}.models import import_models diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 2346a8a..6ca8f5d 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -74,6 +74,9 @@ MYSQL_DATABASE=iti_dev | `services` | 服务客户端配置 | | `service_tokens` | 可信服务 token | | `tasks_enabled` | 是否启动单机任务调度线程 | +| `exchange_enabled` | 是否默认自动挂载本地交换模块 | +| `exchange_default_storage` | 导入导出默认文件存储类型 | +| `exchange_storage` | 导入导出文件存储配置 | | `log_dir` | 运行日志目录 | | `log_file_enabled` | 是否写滚动日志文件 | | `audit_enabled` | 是否发送审计事件 | diff --git a/docs/EXCHANGE.md b/docs/EXCHANGE.md new file mode 100644 index 0000000..8871dee --- /dev/null +++ b/docs/EXCHANGE.md @@ -0,0 +1,67 @@ +# 模板与导入导出 + +iTi-Flask 提供统一的数据模板、导入和导出基础能力。 +框架提供模板源抽象、对象、存储、任务和文件处理。 +业务系统自己决定入口、字段语义、模板中心和回调处理。 + +## 设计边界 + +- 模板按业务实体建模,不按单表字段映射建模。 +- 导入模板和导出模板独立。 +- 模板支持编辑和版本管理。 +- 模板也支持上传 Excel 作为来源。 +- 执行时按任务显式选择已发布版本。 +- 导入和导出都走任务。 +- 没有模板时,框架按字段映射兜底。 +- 模板源可来自本地表、远程模板中心、业务自定义 provider 或纯映射输入。 +- `system` 不是能力开关。 +- 业务项目可以自建模板中心,也可以挂远程模板中心 RPC。 +- 只要实现 `ExchangeSource` 并注册进 `app.state.iti_exchange`,就能接入同一套计划解析和模板文件生成入口。 + +## 框架对象 + +- `ExchangeTemplate` +- `ExchangeTemplateSnapshot` +- `ExchangePlan` +- `ExchangeTemplateBinding` +- `ExchangeField` +- `ExchangePlaceholder` +- `ExchangeTemplateSource` +- `ExchangeTemplateSourceKind` +- `ExchangeTaskKind` +- `ExchangeSource` + +## 入口 + +- `iti.exchange` +- `iti.exchange.service.ExchangeService` +- `iti.exchange.routes.router` +- `iti.exchange.module.create_exchange_module()` + +## 配置 + +```python +class DevConfig(BaseDevConfig): + def __init__(self) -> None: + super().__init__() + self.exchange_enabled = True +``` + +需要文件存储时复用 `file_storage`。 + +## 业务接法 + +业务项目可直接注册 `create_exchange_module()`,也可以自己写模块,只复用 `ExchangeService`、`ExchangeSource`、`register_exchange_source()` 和任务注册接口。 + +业务通常要自己补: + +- 模板字段和 placeholder 定义。 +- 模板发布流程。 +- 模板中心 RPC、本地表维护方式,或自定义 source 注册方式。 +- 导入回执和导出文件命名。 +- 任务执行器里的实际业务处理。 +- 菜单和页面入口。 + +模板中心可以由 `system` 提供,也可以由业务项目自建。框架只提供统一的计划解析、模板文件生成、Excel 读写和 source 接入层。 + +Excel 数据处理走 `pandas`。`openpyxl` 只处理模板结构和格式。 diff --git a/docs/MODULES.md b/docs/MODULES.md index b74dac3..b7fd6d3 100644 --- a/docs/MODULES.md +++ b/docs/MODULES.md @@ -73,3 +73,15 @@ def register_permissions(self, app): 具体授权由 `PermissionProvider` 决定。 单独使用 `iti-flask` 时可注入自己的 provider。 使用 `iti-system` 时由系统包提供数据库 provider。 + +## 模板与导入导出 + +框架内置的交换能力由 `iti.exchange.module.create_exchange_module()` 提供。 +业务模块可以直接复用它,也可以只复用 `ExchangeService`、`ExchangeSource`、`register_exchange_source()`、`register_exchange_task()` 和 `router`。 +模板中心可以由 `system` 承载,也可以由业务模块自建。框架侧能力不依赖 `system` 是否存在。 + +```python +from iti.exchange.module import create_exchange_module + +app = create_app(modules=[create_exchange_module()]) +``` diff --git a/docs/README.md b/docs/README.md index 5310a17..f678202 100644 --- a/docs/README.md +++ b/docs/README.md @@ -9,6 +9,7 @@ AI 修改框架时优先读 `.codex/skills/iti-flask-framework/SKILL.md`。 - [架构](ARCHITECTURE.md) - [配置](CONFIGURATION.md) - [模块协议](MODULES.md) +- [模板与导入导出](EXCHANGE.md) - [Copier 模板](COPIER_TEMPLATE.md) - [测试与部署](TESTING_DEPLOYMENT.md) diff --git a/iti/app.py b/iti/app.py index b222acf..67e1827 100644 --- a/iti/app.py +++ b/iti/app.py @@ -43,6 +43,8 @@ from iti.health import router as health_router from iti.limiter import SimpleLimiter from iti.logging_config import configure_logging, log_extra from iti.modules import init_modules +from iti.exchange import get_exchange_registry +from iti.exchange import models as _exchange_models from iti.responses.auto import is_envelope_payload, is_raw_response_request from iti.responses import fail from iti.service_client import init_service_clients @@ -93,6 +95,7 @@ def create_app( app.state.cache = CacheManager(default_timeout=config.cache_default_timeout) app.state.limiter = SimpleLimiter(enabled=config.ratelimit_enabled) app.state.permission_provider = permission_provider or StaticPermissionProvider() + app.state.exchange_enabled = config.exchange_enabled init_middlewares(app) @@ -116,8 +119,16 @@ def create_app( init_error_handlers(app) init_service_clients(app, config.services) init_task_runner(app) + get_exchange_registry(app) init_audit(app) - module_registry = init_modules(app, modules) + module_list = list(modules or []) + if config.exchange_enabled and not any( + getattr(module, "name", None) == "exchange" for module in module_list + ): + from iti.exchange.module import create_exchange_module + + module_list.append(create_exchange_module()) + module_registry = init_modules(app, module_list) app.state.iti_modules = module_registry if config.health_enabled: app.include_router(health_router) diff --git a/iti/config.py b/iti/config.py index 5d8b4e1..207d9fb 100644 --- a/iti/config.py +++ b/iti/config.py @@ -102,6 +102,9 @@ class BaseConfig: services: dict[str, dict[str, Any]] = field(default_factory=dict) service_tokens: dict[str, str] = field(default_factory=dict) tasks_enabled: bool = False + exchange_enabled: bool = True + exchange_default_storage: str = "local" + exchange_storage: dict[str, Any] = field(default_factory=dict) log_level: str = "INFO" log_dir: str = field(default_factory=lambda: str(BASE_DIR / "runtime" / "logs")) diff --git a/iti/exchange/__init__.py b/iti/exchange/__init__.py new file mode 100644 index 0000000..fcfca9e --- /dev/null +++ b/iti/exchange/__init__.py @@ -0,0 +1,53 @@ +from .base import ( + DataExchangeModule, + ExchangeField, + ExchangePlaceholder, + ExchangePlan, + ExchangeTemplate, + ExchangeTemplateBinding, + ExchangeTemplateKind, + ExchangeTemplateSource, + ExchangeTemplateSourceKind, + ExchangeTemplateSnapshot, + ExchangeTaskKind, +) +from .plan import ExchangeMappingPlanInput +from .registry import ( + ExchangeRegistry, + get_exchange_registry, + get_exchange_source_by_name, + register_exchange_source, +) +from .sources import ( + ExchangeSource, + LocalExchangeSource, + MappingExchangeSource, + RemoteExchangeSource, + get_exchange_source, +) +from .tasks import register_exchange_task + +__all__ = [ + "DataExchangeModule", + "ExchangeField", + "ExchangeMappingPlanInput", + "ExchangePlaceholder", + "ExchangePlan", + "ExchangeRegistry", + "ExchangeTemplate", + "ExchangeTemplateBinding", + "ExchangeTemplateKind", + "ExchangeTemplateSource", + "ExchangeTemplateSourceKind", + "ExchangeTemplateSnapshot", + "ExchangeTaskKind", + "ExchangeSource", + "LocalExchangeSource", + "get_exchange_registry", + "get_exchange_source_by_name", + "get_exchange_source", + "MappingExchangeSource", + "register_exchange_source", + "register_exchange_task", + "RemoteExchangeSource", +] diff --git a/iti/exchange/base.py b/iti/exchange/base.py new file mode 100644 index 0000000..9b3bfee --- /dev/null +++ b/iti/exchange/base.py @@ -0,0 +1,247 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Protocol, Sequence + + +class ExchangeTemplateKind(str, Enum): + IMPORT = "import" + EXPORT = "export" + + +class ExchangeTaskKind(str, Enum): + IMPORT = "import" + EXPORT = "export" + + +@dataclass(frozen=True) +class ExchangePlaceholder: + key: str + label: str + description: str | None = None + required: bool = False + example: str | None = None + + +@dataclass(frozen=True) +class ExchangeField: + key: str + label: str + placeholder: str | None = None + required: bool = False + example: str | None = None + width: int | None = None + format: str | None = None + source: str | None = None + target: str | None = None + options: tuple[tuple[str, str], ...] = () + meta: dict[str, Any] = field(default_factory=dict) + + def workbook_header(self) -> str: + return self.placeholder or self.label or self.key + + def export_source_key(self) -> str: + return self.source or self.key + + def import_target_key(self) -> str: + return self.target or self.key + + +@dataclass(frozen=True) +class ExchangeTemplateBinding: + entity: str + template_kind: ExchangeTemplateKind + handler: str | None = None + description: str | None = None + default_sheet_name: str | None = None + default_file_name: str | None = None + title: str | None = None + meta: dict[str, Any] = field(default_factory=dict) + + +class ExchangeTemplateSourceKind(str, Enum): + LOCAL = "local" + REMOTE = "remote" + MAPPING = "mapping" + CUSTOM = "custom" + + +@dataclass(frozen=True) +class ExchangeTemplateSnapshot: + id: str + version: str + template_id: str + template_kind: ExchangeTemplateKind + bindings: tuple[ExchangeTemplateBinding, ...] = () + published_at: str | None = None + file_key: str | None = None + checksum: str | None = None + fields: tuple[ExchangeField, ...] = () + placeholders: tuple[ExchangePlaceholder, ...] = () + meta: dict[str, Any] = field(default_factory=dict) + + def to_plan(self) -> "ExchangePlan": + meta = dict(self.meta) + return ExchangePlan( + template_kind=self.template_kind, + template_id=self.template_id, + version_id=self.id, + version=self.version, + bindings=self.bindings, + fields=self.fields, + placeholders=self.placeholders, + title=meta.get("title"), + description=meta.get("description"), + sheet_name=meta.get("sheet_name"), + meta=meta, + ) + + +@dataclass(frozen=True) +class ExchangePlan: + template_kind: ExchangeTemplateKind + template_id: str | None = None + version_id: str | None = None + version: str | None = None + bindings: tuple[ExchangeTemplateBinding, ...] = () + fields: tuple[ExchangeField, ...] = () + placeholders: tuple[ExchangePlaceholder, ...] = () + title: str | None = None + description: str | None = None + sheet_name: str | None = None + meta: dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_mapping( + cls, + *, + template_kind: ExchangeTemplateKind | str, + template_id: str | None = None, + version_id: str | None = None, + version: str | None = None, + bindings: Sequence[ExchangeTemplateBinding] | None = None, + fields: Sequence[ExchangeField] | None = None, + placeholders: Sequence[ExchangePlaceholder] | None = None, + title: str | None = None, + description: str | None = None, + sheet_name: str | None = None, + meta: dict[str, Any] | None = None, + ) -> "ExchangePlan": + return cls( + template_kind=ExchangeTemplateKind(template_kind), + template_id=template_id, + version_id=version_id, + version=version, + bindings=tuple(bindings or ()), + fields=tuple(fields or ()), + placeholders=tuple(placeholders or ()), + title=title, + description=description, + sheet_name=sheet_name, + meta=meta or {}, + ) + + def resolved_meta(self) -> dict[str, Any]: + meta = dict(self.meta) + if self.template_id is not None: + meta.setdefault("template_id", self.template_id) + if self.version_id is not None: + meta.setdefault("version_id", self.version_id) + if self.version is not None: + meta.setdefault("version", self.version) + if self.title is not None: + meta.setdefault("title", self.title) + if self.description is not None: + meta.setdefault("description", self.description) + if self.sheet_name is not None: + meta.setdefault("sheet_name", self.sheet_name) + return meta + + +@dataclass(frozen=True) +class ExchangeTemplateSource: + kind: ExchangeTemplateSourceKind + template_kind: ExchangeTemplateKind + template_id: str | None = None + version_id: str | None = None + version: str | None = None + service: str | None = None + bindings: tuple[ExchangeTemplateBinding, ...] = () + fields: tuple[ExchangeField, ...] = () + placeholders: tuple[ExchangePlaceholder, ...] = () + title: str | None = None + description: str | None = None + sheet_name: str | None = None + meta: dict[str, Any] = field(default_factory=dict) + + def to_plan(self) -> ExchangePlan: + meta = dict(self.meta) + return ExchangePlan( + template_kind=self.template_kind, + template_id=self.template_id, + version_id=self.version_id, + version=self.version, + bindings=self.bindings, + fields=self.fields, + placeholders=self.placeholders, + title=self.title, + description=self.description, + sheet_name=self.sheet_name, + meta=meta, + ) + + +@dataclass(frozen=True) +class ExchangeTemplate: + id: str + code: str + name: str + template_kind: ExchangeTemplateKind + entity: str + status: str = "draft" + description: str | None = None + current_version: str | None = None + bindings: tuple[ExchangeTemplateBinding, ...] = () + fields: tuple[ExchangeField, ...] = () + placeholders: tuple[ExchangePlaceholder, ...] = () + meta: dict[str, Any] = field(default_factory=dict) + + def to_plan(self) -> ExchangePlan: + meta = dict(self.meta) + return ExchangePlan( + template_kind=self.template_kind, + template_id=self.id, + version_id=self.current_version, + bindings=self.bindings, + fields=self.fields, + placeholders=self.placeholders, + title=self.name, + description=self.description, + sheet_name=meta.get("sheet_name"), + meta={ + "code": self.code, + "status": self.status, + "current_version": self.current_version, + **meta, + }, + ) + + +class DataExchangeModule(Protocol): + name: str + + def init_app(self, app) -> None: + ... + + def register_routes(self, app) -> None: + ... + + def register_permissions(self, app) -> None: + ... + + def register_menu_seed(self, app) -> None: + ... + + def register_tasks(self, app) -> None: + ... diff --git a/iti/exchange/excel.py b/iti/exchange/excel.py new file mode 100644 index 0000000..b74a274 --- /dev/null +++ b/iti/exchange/excel.py @@ -0,0 +1,362 @@ +from __future__ import annotations + +from dataclasses import dataclass +from io import BytesIO +from typing import Any + +import pandas as pd +from openpyxl import Workbook, load_workbook +from openpyxl.worksheet.worksheet import Worksheet + +from .base import ExchangeField, ExchangePlaceholder, ExchangePlan, ExchangeTemplateSnapshot + + +@dataclass +class ExcelTemplateCodec: + """Render and parse template workbooks.""" + + def build_workbook(self, snapshot: ExchangeTemplateSnapshot | ExchangePlan) -> Workbook: + workbook = Workbook() + worksheet = workbook.active + sheet_name = getattr(snapshot, "sheet_name", None) + worksheet.title = ( + sheet_name + or snapshot.meta.get("sheet_name") + or (snapshot.bindings[0].default_sheet_name if snapshot.bindings else None) + or _safe_sheet_name("Template") + ) + row = self._write_header(worksheet, snapshot) + row = self._write_bindings(worksheet, snapshot, row) + row = self._write_placeholders(worksheet, snapshot, row) + self._write_fields(worksheet, snapshot, row) + return workbook + + def _write_header( + self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan + ) -> int: + meta = snapshot.meta if hasattr(snapshot, "meta") else {} + title = getattr(snapshot, "title", None) + version = getattr(snapshot, "version", None) + worksheet["A1"] = title or meta.get("title") or snapshot.template_id or "Template" + if version: + worksheet["A2"] = f"version: {version}" + elif meta.get("version"): + worksheet["A2"] = f"version: {meta['version']}" + description = getattr(snapshot, "description", None) + if meta.get("description") or description: + worksheet["A3"] = meta.get("description") or description + return 5 + + def _write_bindings( + self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan, row: int + ) -> int: + if not snapshot.bindings: + return row + worksheet.cell(row=row, column=1, value="Bindings") + row += 1 + headers = [ + "entity", + "template_kind", + "handler", + "description", + "default_sheet_name", + "default_file_name", + "title", + ] + for col, value in enumerate(headers, start=1): + worksheet.cell(row=row, column=col, value=value) + row += 1 + for binding in snapshot.bindings: + values = [ + binding.entity, + _enum_value(binding.template_kind), + binding.handler, + binding.description, + binding.default_sheet_name, + binding.default_file_name, + binding.title, + ] + for col, value in enumerate(values, start=1): + worksheet.cell(row=row, column=col, value=value) + row += 1 + return row + + def _write_placeholders( + self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan, row: int + ) -> int: + if not snapshot.placeholders: + return row + worksheet.cell(row=row, column=1, value="Placeholders") + row += 1 + for placeholder in snapshot.placeholders: + worksheet.cell(row=row, column=1, value=placeholder.key) + worksheet.cell(row=row, column=2, value=placeholder.label) + worksheet.cell(row=row, column=3, value=placeholder.description) + worksheet.cell(row=row, column=4, value=placeholder.example) + worksheet.cell(row=row, column=5, value=placeholder.required) + row += 1 + return row + + def _write_fields( + self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan, row: int + ) -> None: + worksheet.cell(row=row, column=1, value="Fields") + row += 1 + headers = [ + "key", + "label", + "placeholder", + "required", + "example", + "format", + "source", + "target", + ] + for col, value in enumerate(headers, start=1): + worksheet.cell(row=row, column=col, value=value) + row += 1 + for field in snapshot.fields: + values = [ + field.key, + field.label, + field.placeholder, + field.required, + field.example, + field.format, + field.source, + field.target, + ] + for col, value in enumerate(values, start=1): + worksheet.cell(row=row, column=col, value=value) + row += 1 + + def dump(self, snapshot: ExchangeTemplateSnapshot | ExchangePlan) -> bytes: + buffer = BytesIO() + self.build_workbook(snapshot).save(buffer) + return buffer.getvalue() + + def load(self, content: bytes) -> dict[str, Any]: + workbook = load_workbook(BytesIO(content)) + worksheet = workbook.active + payload = { + "title": worksheet["A1"].value, + "version": worksheet["A2"].value, + "description": worksheet["A3"].value, + "sheet_name": worksheet.title, + } + payload["bindings"], payload["placeholders"], payload["fields"] = self._parse_sections( + worksheet + ) + return payload + + def _parse_sections( + self, worksheet: Worksheet + ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: + bindings: list[dict[str, Any]] = [] + placeholders: list[dict[str, Any]] = [] + fields: list[dict[str, Any]] = [] + mode: str | None = None + headers: list[str] = [] + + for row in worksheet.iter_rows(values_only=True): + cells = [cell for cell in row] + first = cells[0] if cells else None + if first == "Bindings": + mode = "bindings_headers" + headers = [] + continue + if first == "Placeholders": + mode = "placeholders" + continue + if first == "Fields": + mode = "fields_headers" + headers = [] + continue + if mode == "bindings_headers": + headers = [str(cell) if cell is not None else "" for cell in cells] + if not headers: + continue + mode = "bindings" + continue + if mode == "bindings": + if not any(cell is not None for cell in cells): + continue + item = {headers[idx]: cells[idx] for idx in range(min(len(headers), len(cells)))} + if not item.get("entity") and not item.get("template_kind"): + continue + bindings.append( + { + "entity": item.get("entity"), + "template_kind": item.get("template_kind"), + "handler": item.get("handler"), + "description": item.get("description"), + "default_sheet_name": item.get("default_sheet_name"), + "default_file_name": item.get("default_file_name"), + "title": item.get("title"), + "meta": {}, + } + ) + continue + if mode == "placeholders": + if not any(cell is not None for cell in cells): + continue + placeholders.append( + { + "key": cells[0], + "label": cells[1], + "description": cells[2], + "example": cells[3], + "required": bool(cells[4]) if len(cells) > 4 else False, + } + ) + continue + if mode == "fields_headers": + headers = [str(cell) if cell is not None else "" for cell in cells] + if not headers: + continue + mode = "fields" + continue + if mode == "fields": + if not any(cell is not None for cell in cells): + continue + item = {headers[idx]: cells[idx] for idx in range(min(len(headers), len(cells)))} + if item.get("key") is None and item.get("label") is None: + continue + fields.append( + { + "key": item.get("key"), + "label": item.get("label"), + "placeholder": item.get("placeholder"), + "required": bool(item.get("required", False)), + "example": item.get("example"), + "format": item.get("format"), + "source": item.get("source"), + "target": item.get("target"), + "options": [], + "meta": {}, + } + ) + return bindings, placeholders, fields + + +@dataclass +class ExcelWorkbookCodec: + """Read and write exchange data workbooks.""" + + def export_rows( + self, + headers: list[str], + rows: list[dict[str, Any]], + *, + sheet_name: str = "Export", + ) -> bytes: + buffer = BytesIO() + dataframe = pd.DataFrame.from_records(rows, columns=headers) + with pd.ExcelWriter(buffer, engine="openpyxl") as writer: + dataframe.to_excel( + writer, + index=False, + sheet_name=_safe_sheet_name(sheet_name), + ) + return buffer.getvalue() + + def import_rows(self, content: bytes) -> list[dict[str, Any]]: + dataframe = self._read_sheet(content) + if dataframe.empty and len(dataframe.columns) == 0: + return [] + headers = [self._header_name(value) for value in dataframe.iloc[0].tolist()] + return self._frame_to_records(dataframe.iloc[1:], headers) + + def import_rows_with_fields( + self, + content: bytes, + *, + fields: list[ExchangeField], + ) -> list[dict[str, Any]]: + dataframe = self._read_sheet(content) + if dataframe.empty and len(dataframe.columns) == 0: + return [] + header_map = {field.workbook_header(): field.import_target_key() for field in fields} + headers = [self._header_name(value) for value in dataframe.iloc[0].tolist()] + return self._frame_to_records(dataframe.iloc[1:], headers, header_map=header_map) + + def export_rows_with_template( + self, + *, + fields: list[ExchangeField], + rows: list[dict[str, Any]], + sheet_name: str = "Export", + ) -> bytes: + headers = [field.workbook_header() for field in fields] + normalized_rows: list[dict[str, Any]] = [] + for row in rows: + item: dict[str, Any] = {} + for field in fields: + item[field.workbook_header()] = row.get(field.export_source_key()) + normalized_rows.append(item) + return self.export_rows(headers, normalized_rows, sheet_name=sheet_name) + + def export_rows_with_plan( + self, + *, + plan: ExchangePlan, + rows: list[dict[str, Any]], + sheet_name: str | None = None, + ) -> bytes: + return self.export_rows_with_template( + fields=list(plan.fields), + rows=rows, + sheet_name=sheet_name or plan.sheet_name or "Export", + ) + + def _read_sheet(self, content: bytes) -> pd.DataFrame: + workbook = load_workbook(BytesIO(content), read_only=True, data_only=True) + sheet_name = workbook.active.title + workbook.close() + return pd.read_excel( + BytesIO(content), + sheet_name=sheet_name, + header=None, + dtype=object, + engine="openpyxl", + ) + + def _frame_to_records( + self, + dataframe: pd.DataFrame, + headers: list[str], + *, + header_map: dict[str, str] | None = None, + ) -> list[dict[str, Any]]: + result: list[dict[str, Any]] = [] + for values in dataframe.itertuples(index=False, name=None): + item: dict[str, Any] = {} + for index, header in enumerate(headers): + if not header: + continue + key = header_map.get(header, header) if header_map is not None else header + value = values[index] if index < len(values) else None + item[key] = self._normalize_value(value) + result.append(item) + return result + + @staticmethod + def _normalize_value(value: Any) -> Any: + return None if pd.isna(value) else value + + @staticmethod + def _header_name(value: Any) -> str: + normalized = ExcelWorkbookCodec._normalize_value(value) + return "" if normalized is None else str(normalized) + + +def _safe_sheet_name(value: str) -> str: + cleaned = "".join(ch for ch in value if ch not in "[]:*?/\\") + cleaned = cleaned.strip() + if not cleaned: + cleaned = "Sheet" + return cleaned[:31] + + +def _enum_value(value: Any) -> Any: + return value.value if hasattr(value, "value") else value diff --git a/iti/exchange/models.py b/iti/exchange/models.py new file mode 100644 index 0000000..b7a82d5 --- /dev/null +++ b/iti/exchange/models.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from sqlalchemy import DateTime, ForeignKey, Index, JSON, String, Text, UniqueConstraint +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from iti.db import Base, IdMixin, TimestampMixin + + +class ExchangeTemplateModel(Base, IdMixin, TimestampMixin): + __tablename__ = "exchange_templates" + + code: Mapped[str] = mapped_column(String(128), unique=True, index=True, comment="模板编码") + name: Mapped[str] = mapped_column(String(255), comment="模板名称") + template_kind: Mapped[str] = mapped_column(String(32), index=True, comment="模板类型") + entity: Mapped[str] = mapped_column(String(128), index=True, comment="业务实体") + status: Mapped[str] = mapped_column(String(32), default="draft", index=True, comment="状态") + description: Mapped[str | None] = mapped_column(Text, nullable=True, comment="说明") + current_version: Mapped[str | None] = mapped_column( + String(64), nullable=True, index=True, comment="当前版本" + ) + meta: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict, comment="扩展配置") + + versions: Mapped[list["ExchangeTemplateVersionModel"]] = relationship( + back_populates="template", + cascade="all, delete-orphan", + ) + + +class ExchangeTemplateVersionModel(Base, IdMixin, TimestampMixin): + __tablename__ = "exchange_template_versions" + __table_args__ = ( + UniqueConstraint("template_id", "version", name="uq_exchange_template_versions_template_version"), + Index("ix_exchange_template_versions_template_id_version", "template_id", "version"), + ) + + template_id: Mapped[str] = mapped_column( + String(36), + ForeignKey("exchange_templates.id", ondelete="CASCADE"), + index=True, + comment="模板ID", + ) + version: Mapped[str] = mapped_column(String(64), comment="版本号") + template_kind: Mapped[str] = mapped_column(String(32), index=True, comment="模板类型") + published_at: Mapped[datetime | None] = mapped_column( + DateTime, + nullable=True, + comment="发布时间", + ) + file_key: Mapped[str | None] = mapped_column(String(512), nullable=True, comment="模板文件") + checksum: Mapped[str | None] = mapped_column(String(128), nullable=True, comment="校验值") + bindings: Mapped[list[dict[str, Any]]] = mapped_column(JSON, default=list, comment="绑定配置") + fields: Mapped[list[dict[str, Any]]] = mapped_column(JSON, default=list, comment="字段定义") + placeholders: Mapped[list[dict[str, Any]]] = mapped_column(JSON, default=list, comment="占位符定义") + meta: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict, comment="扩展配置") + + template: Mapped["ExchangeTemplateModel"] = relationship(back_populates="versions") + + +class ExchangeTaskModel(Base, IdMixin, TimestampMixin): + __tablename__ = "exchange_tasks" + __table_args__ = ( + Index("ix_exchange_tasks_template_id_kind_status", "template_id", "task_kind", "status"), + Index("ix_exchange_tasks_version_id", "template_version_id"), + ) + + template_id: Mapped[str | None] = mapped_column( + String(36), + ForeignKey("exchange_templates.id", ondelete="SET NULL"), + nullable=True, + index=True, + comment="模板ID", + ) + template_version_id: Mapped[str | None] = mapped_column( + String(36), + ForeignKey("exchange_template_versions.id", ondelete="SET NULL"), + nullable=True, + index=True, + comment="模板版本ID", + ) + task_kind: Mapped[str] = mapped_column(String(32), index=True, comment="任务类型") + status: Mapped[str] = mapped_column(String(32), default="pending", index=True, comment="状态") + requested_by: Mapped[str | None] = mapped_column(String(36), nullable=True, index=True, comment="发起人") + storage_key: Mapped[str | None] = mapped_column(String(512), nullable=True, comment="任务文件") + checksum: Mapped[str | None] = mapped_column(String(128), nullable=True, comment="校验值") + error_count: Mapped[int] = mapped_column(default=0, comment="错误数") + success_count: Mapped[int] = mapped_column(default=0, comment="成功数") + failed_count: Mapped[int] = mapped_column(default=0, comment="失败数") + message: Mapped[str | None] = mapped_column(Text, nullable=True, comment="消息") + input_payload: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict, comment="输入参数") + result_payload: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict, comment="输出结果") + started_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True, comment="开始时间") + finished_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True, comment="结束时间") + meta: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict, comment="扩展配置") + + template: Mapped["ExchangeTemplateModel"] = relationship( + foreign_keys=[template_id] + ) + version: Mapped["ExchangeTemplateVersionModel"] = relationship( + foreign_keys=[template_version_id] + ) + rows: Mapped[list["ExchangeTaskRowModel"]] = relationship( + back_populates="task", + cascade="all, delete-orphan", + ) + + +class ExchangeTaskRowModel(Base, IdMixin, TimestampMixin): + __tablename__ = "exchange_task_rows" + __table_args__ = ( + UniqueConstraint("task_id", "row_index", name="uq_exchange_task_rows_task_row"), + Index("ix_exchange_task_rows_task_id_status", "task_id", "status"), + ) + + task_id: Mapped[str] = mapped_column( + String(36), + ForeignKey("exchange_tasks.id", ondelete="CASCADE"), + index=True, + comment="任务ID", + ) + row_index: Mapped[int] = mapped_column(comment="行号") + status: Mapped[str] = mapped_column(String(32), default="pending", index=True, comment="状态") + data: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict, comment="原始数据") + message: Mapped[str | None] = mapped_column(Text, nullable=True, comment="错误信息") + result: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict, comment="结果数据") + + task: Mapped["ExchangeTaskModel"] = relationship(back_populates="rows") diff --git a/iti/exchange/module.py b/iti/exchange/module.py new file mode 100644 index 0000000..8996cd9 --- /dev/null +++ b/iti/exchange/module.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from iti.modules import ModulePermission + +from .routes import router + + +class ExchangeModule: + name = "exchange" + + def register_routes(self, app) -> None: + app.include_router(router) + + def register_permissions(self, app) -> None: + app.state.iti_modules.register_permission( + ModulePermission("exchange:template:list", "数据模板列表") + ) + app.state.iti_modules.register_permission( + ModulePermission("exchange:template:manage", "数据模板管理") + ) + app.state.iti_modules.register_permission( + ModulePermission("exchange:task:create", "导入导出任务创建") + ) + app.state.iti_modules.register_permission( + ModulePermission("exchange:task:list", "导入导出任务列表") + ) + + +def create_exchange_module() -> ExchangeModule: + return ExchangeModule() diff --git a/iti/exchange/plan.py b/iti/exchange/plan.py new file mode 100644 index 0000000..0ce00bf --- /dev/null +++ b/iti/exchange/plan.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from .base import ( + ExchangeField, + ExchangePlaceholder, + ExchangePlan, + ExchangeTemplateBinding, + ExchangeTemplateKind, +) + + +@dataclass(frozen=True) +class ExchangeMappingPlanInput: + template_kind: ExchangeTemplateKind | str + template_id: str | None = None + version_id: str | None = None + version: str | None = None + bindings: list[ExchangeTemplateBinding] | None = None + fields: list[ExchangeField] | None = None + placeholders: list[ExchangePlaceholder] | None = None + title: str | None = None + description: str | None = None + sheet_name: str | None = None + meta: dict[str, Any] | None = None + + def to_plan(self) -> ExchangePlan: + return ExchangePlan.from_mapping( + template_kind=self.template_kind, + template_id=self.template_id, + version_id=self.version_id, + version=self.version, + bindings=self.bindings, + fields=self.fields, + placeholders=self.placeholders, + title=self.title, + description=self.description, + sheet_name=self.sheet_name, + meta=self.meta, + ) + diff --git a/iti/exchange/registry.py b/iti/exchange/registry.py new file mode 100644 index 0000000..f4f804f --- /dev/null +++ b/iti/exchange/registry.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + +from .base import ExchangeTemplate, ExchangeTemplateSnapshot + + +@dataclass +class ExchangeRegistry: + templates: dict[str, ExchangeTemplate] = field(default_factory=dict) + versions: dict[str, ExchangeTemplateSnapshot] = field(default_factory=dict) + sources: dict[str, object] = field(default_factory=dict) + + def register_template(self, template: ExchangeTemplate) -> ExchangeTemplate: + if not template.id: + raise ValueError("template id is required") + if not template.code: + raise ValueError("template code is required") + if template.id in self.templates: + raise ValueError(f"template already registered: {template.id}") + self.templates[template.id] = template + return template + + def register_version( + self, snapshot: ExchangeTemplateSnapshot + ) -> ExchangeTemplateSnapshot: + if not snapshot.id: + raise ValueError("snapshot id is required") + if not snapshot.template_id: + raise ValueError("snapshot template id is required") + key = self._version_key(snapshot.template_id, snapshot.version) + if key in self.versions: + raise ValueError(f"template version already registered: {key}") + self.versions[key] = snapshot + return snapshot + + def get_template(self, template_id: str) -> ExchangeTemplate | None: + return self.templates.get(template_id) + + def get_version( + self, template_id: str, version: str + ) -> ExchangeTemplateSnapshot | None: + return self.versions.get(self._version_key(template_id, version)) + + def latest_version( + self, template_id: str + ) -> ExchangeTemplateSnapshot | None: + template = self.templates.get(template_id) + if template is None or not template.current_version: + return None + return self.get_version(template_id, template.current_version) + + def list_templates(self) -> list[ExchangeTemplate]: + return sorted(self.templates.values(), key=lambda item: (item.entity, item.code)) + + def list_versions(self, template_id: str | None = None) -> list[ExchangeTemplateSnapshot]: + snapshots = list(self.versions.values()) + if template_id is not None: + snapshots = [item for item in snapshots if item.template_id == template_id] + return sorted(snapshots, key=lambda item: (item.template_id, item.version)) + + def register_source(self, name: str, source: object) -> object: + if not name: + raise ValueError("source name is required") + if name in self.sources: + raise ValueError(f"exchange source already registered: {name}") + self.sources[name] = source + return source + + def get_source(self, name: str) -> object | None: + return self.sources.get(name) + + @staticmethod + def _version_key(template_id: str, version: str) -> str: + return f"{template_id}:{version}" + + +def get_exchange_registry(app) -> ExchangeRegistry: + registry = getattr(app.state, "iti_exchange", None) + if registry is None: + registry = ExchangeRegistry() + app.state.iti_exchange = registry + return registry + + +def register_exchange_source(app, name: str, source: object) -> object: + return get_exchange_registry(app).register_source(name, source) + + +def get_exchange_source_by_name(app, name: str) -> object | None: + return get_exchange_registry(app).get_source(name) diff --git a/iti/exchange/routes.py b/iti/exchange/routes.py new file mode 100644 index 0000000..6c0e4ff --- /dev/null +++ b/iti/exchange/routes.py @@ -0,0 +1,457 @@ +from __future__ import annotations + +from dataclasses import asdict + +from fastapi import APIRouter, Depends, File, Request, UploadFile +from fastapi.responses import StreamingResponse +from sqlalchemy.orm import Session + +from iti.db import get_db +from iti.exceptions import BizError +from iti.responses import ok, raw_response + +from .base import ExchangeField, ExchangePlaceholder, ExchangeTemplateBinding, ExchangePlan +from .schemas import ( + ExchangeFieldSchema, + ExchangePlanResolveRequest, + ExchangePlanTemplateFileRequest, + ExchangePlaceholderSchema, + ExchangeTemplateBindingSchema, + ExchangeTemplateSourceKind, + ExchangeTaskCreateRequest, + ExchangeTaskResponse, + ExchangeTemplateCreateRequest, + ExchangeTemplateResponse, + ExchangeTemplateVersionCreateRequest, + ExchangeTemplateVersionResponse, + ExchangeTemplateUpdateRequest, +) +from .service import ExchangeService +from .excel import ExcelTemplateCodec +from .sources import get_exchange_source + + +router = APIRouter(prefix="/exchange", tags=["exchange"]) + + +def _template_payload(item): + return { + "id": item.id, + "code": item.code, + "name": item.name, + "template_kind": item.template_kind, + "entity": item.entity, + "status": item.status, + "description": item.description, + "current_version": item.current_version, + "meta": item.meta, + "created_at": item.created_at, + "updated_at": item.updated_at, + } + + +def _version_payload(item): + return { + "id": item.id, + "template_id": item.template_id, + "version": item.version, + "template_kind": item.template_kind, + "published_at": item.published_at, + "file_key": item.file_key, + "checksum": item.checksum, + "bindings": item.bindings, + "fields": item.fields, + "placeholders": item.placeholders, + "meta": item.meta, + "created_at": item.created_at, + "updated_at": item.updated_at, + } + + +def _task_payload(item): + return { + "id": item.id, + "template_id": item.template_id, + "template_version_id": item.template_version_id, + "task_kind": item.task_kind, + "status": item.status, + "requested_by": item.requested_by, + "storage_key": item.storage_key, + "success_count": item.success_count, + "failed_count": item.failed_count, + "error_count": item.error_count, + "message": item.message, + "input_payload": item.input_payload, + "result_payload": item.result_payload, + "started_at": item.started_at, + "finished_at": item.finished_at, + "meta": item.meta, + "created_at": item.created_at, + "updated_at": item.updated_at, + } + + +def _plan_payload(plan: ExchangePlan): + return { + "template_kind": plan.template_kind, + "template_id": plan.template_id, + "version_id": plan.version_id, + "version": plan.version, + "bindings": _plan_schema_items(plan.bindings), + "fields": _plan_schema_items(plan.fields), + "placeholders": _plan_schema_items(plan.placeholders), + "title": plan.title, + "description": plan.description, + "sheet_name": plan.sheet_name, + "meta": plan.meta, + } + + +def _plan_schema_items(items): + return [asdict(item) for item in items] + + +def _binding_from_payload(item): + return ExchangeTemplateBinding( + entity=item.get("entity"), + template_kind=item.get("template_kind"), + handler=item.get("handler"), + description=item.get("description"), + default_sheet_name=item.get("default_sheet_name"), + default_file_name=item.get("default_file_name"), + title=item.get("title"), + meta=item.get("meta") or {}, + ) + + +def _field_from_payload(item): + return ExchangeField( + key=item.get("key"), + label=item.get("label"), + placeholder=item.get("placeholder"), + required=bool(item.get("required", False)), + example=item.get("example"), + width=item.get("width"), + format=item.get("format"), + source=item.get("source"), + target=item.get("target"), + options=tuple(tuple(option) for option in item.get("options") or []), + meta=item.get("meta") or {}, + ) + + +def _placeholder_from_payload(item): + return ExchangePlaceholder( + key=item.get("key"), + label=item.get("label"), + description=item.get("description"), + required=bool(item.get("required", False)), + example=item.get("example"), + ) + + +def _resolve_source(payload, request: Request, db: Session): + source_kind = payload.source_kind + source_name = getattr(payload, "source_name", None) + if source_name: + return get_exchange_source( + request.app, + source_name=source_name, + db=db, + service_name=getattr(payload, "source_service", "exchange"), + ) + if source_kind == ExchangeTemplateSourceKind.LOCAL: + return get_exchange_source(request.app, source_kind=source_kind, db=db) + if source_kind == ExchangeTemplateSourceKind.REMOTE: + return get_exchange_source( + request.app, + source_kind=source_kind, + service_name=payload.source_service or "exchange", + ) + if source_kind == ExchangeTemplateSourceKind.MAPPING or source_kind is None: + return get_exchange_source(request.app, source_kind=ExchangeTemplateSourceKind.MAPPING) + return get_exchange_source(request.app, source_kind=source_kind, db=db) + + +@router.get("/templates") +def list_templates(request: Request, db: Session = Depends(get_db)): + service = ExchangeService(request.app, db) + return ok( + [ + ExchangeTemplateResponse.model_validate(_template_payload(item)).model_dump(mode="json") + for item in service.list_templates() + ] + ) + + +@router.get("/templates/{template_id}") +def get_template(template_id: str, request: Request, db: Session = Depends(get_db)): + service = ExchangeService(request.app, db) + template = service.get_template_or_404(template_id) + return ok(ExchangeTemplateResponse.model_validate(_template_payload(template)).model_dump(mode="json")) + + +@router.post("/templates") +def create_template( + payload: ExchangeTemplateCreateRequest, + request: Request, + db: Session = Depends(get_db), +): + service = ExchangeService(request.app, db) + template = service.create_template( + code=payload.code, + name=payload.name, + template_kind=payload.template_kind, + entity=payload.entity, + description=payload.description, + meta=payload.meta, + ) + return ok(ExchangeTemplateResponse.model_validate(_template_payload(template)).model_dump(mode="json")) + + +@router.patch("/templates/{template_id}") +def update_template( + template_id: str, + payload: ExchangeTemplateUpdateRequest, + request: Request, + db: Session = Depends(get_db), +): + service = ExchangeService(request.app, db) + template = service.update_template( + template_id, + name=payload.name, + description=payload.description, + status=payload.status, + current_version=payload.current_version, + meta=payload.meta, + ) + return ok(ExchangeTemplateResponse.model_validate(_template_payload(template)).model_dump(mode="json")) + + +@router.get("/templates/{template_id}/versions") +def list_versions( + template_id: str, + request: Request, + db: Session = Depends(get_db), +): + service = ExchangeService(request.app, db) + return ok( + [ + ExchangeTemplateVersionResponse.model_validate(_version_payload(item)).model_dump(mode="json") + for item in service.list_versions(template_id) + ] + ) + + +@router.get("/templates/{template_id}/versions/{version_id}") +def get_version( + template_id: str, + version_id: str, + request: Request, + db: Session = Depends(get_db), +): + service = ExchangeService(request.app, db) + version = service.get_version_or_404(version_id) + if version.template_id != template_id: + raise BizError("模板版本不存在", code=404) + return ok(ExchangeTemplateVersionResponse.model_validate(_version_payload(version)).model_dump(mode="json")) + + +@router.get("/tasks") +def list_tasks(request: Request, db: Session = Depends(get_db)): + service = ExchangeService(request.app, db) + return ok( + [ + ExchangeTaskResponse.model_validate(_task_payload(item)).model_dump(mode="json") + for item in service.list_tasks() + ] + ) + + +@router.get("/tasks/{task_id}") +def get_task(task_id: str, request: Request, db: Session = Depends(get_db)): + service = ExchangeService(request.app, db) + task = service.get_task_or_404(task_id) + return ok(ExchangeTaskResponse.model_validate(_task_payload(task)).model_dump(mode="json")) + + +@router.post("/templates/{template_id}/versions") +def publish_version( + template_id: str, + payload: ExchangeTemplateVersionCreateRequest, + request: Request, + db: Session = Depends(get_db), +): + service = ExchangeService(request.app, db) + version = service.publish_version( + template_id=template_id, + version=payload.version, + bindings=[ + ExchangeTemplateBinding(**item.model_dump()) for item in payload.bindings + ], + fields=[ExchangeField(**item.model_dump()) for item in payload.fields], + placeholders=[ + ExchangePlaceholder(**item.model_dump()) for item in payload.placeholders + ], + meta=payload.meta, + make_current=payload.make_current, + ) + return ok(ExchangeTemplateVersionResponse.model_validate(_version_payload(version)).model_dump(mode="json")) + + +@router.get("/template-versions/{version_id}/download") +@raw_response +def download_template_version( + version_id: str, + request: Request, + db: Session = Depends(get_db), +): + service = ExchangeService(request.app, db) + content = service.build_template_file(version_id) + return StreamingResponse( + iter([content]), + media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + headers={"Content-Disposition": 'attachment; filename="template.xlsx"'}, + ) + + +@router.post("/plans/resolve") +def resolve_plan( + payload: ExchangePlanResolveRequest, + request: Request, + db: Session = Depends(get_db), +): + service = ExchangeService(request.app, db) + source = _resolve_source(payload, request, db) + plan = service.resolve_plan( + template_kind=payload.template_kind, + template_id=payload.template_id, + version_id=payload.version_id, + version=payload.version, + bindings=[ExchangeTemplateBinding(**item.model_dump()) for item in payload.bindings], + fields=[ExchangeField(**item.model_dump()) for item in payload.fields], + placeholders=[ExchangePlaceholder(**item.model_dump()) for item in payload.placeholders], + title=payload.title, + description=payload.description, + sheet_name=payload.sheet_name, + meta=payload.meta, + source=source, + ) + return ok(_plan_payload(plan)) + + +@router.post("/plans/template-file") +@raw_response +def build_plan_template_file( + payload: ExchangePlanTemplateFileRequest, + request: Request, + db: Session = Depends(get_db), +): + service = ExchangeService(request.app, db) + source = _resolve_source(payload, request, db) + plan = service.resolve_plan( + template_kind=payload.template_kind, + template_id=payload.template_id, + version_id=payload.version_id, + version=payload.version, + bindings=[ExchangeTemplateBinding(**item.model_dump()) for item in payload.bindings], + fields=[ExchangeField(**item.model_dump()) for item in payload.fields], + placeholders=[ExchangePlaceholder(**item.model_dump()) for item in payload.placeholders], + title=payload.title, + description=payload.description, + sheet_name=payload.sheet_name, + meta=payload.meta, + source=source, + ) + content = source.load_template_file(plan) if source is not None else None + if content is None: + content = service.build_plan_template_file(plan) + return StreamingResponse( + iter([content]), + media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + headers={"Content-Disposition": 'attachment; filename="template.xlsx"'}, + ) + + +@router.post("/templates/{template_id}/versions/upload") +def upload_template_version( + template_id: str, + version: str, + request: Request, + file: UploadFile = File(...), + db: Session = Depends(get_db), +): + service = ExchangeService(request.app, db) + content = file.file.read() + parsed = ExcelTemplateCodec().load(content) + snapshot = service.publish_version( + template_id=template_id, + version=version, + bindings=[_binding_from_payload(item) for item in parsed.get("bindings", [])], + fields=[_field_from_payload(item) for item in parsed.get("fields", [])], + placeholders=[ + _placeholder_from_payload(item) for item in parsed.get("placeholders", []) + ], + meta={ + "title": parsed.get("title"), + "description": parsed.get("description"), + "sheet_name": parsed.get("sheet_name"), + "source_file": file.filename, + }, + file_content=content, + file_name=file.filename, + ) + return ok(ExchangeTemplateVersionResponse.model_validate(_version_payload(snapshot)).model_dump(mode="json")) + + +@router.post("/tasks") +def create_task( + payload: ExchangeTaskCreateRequest, + request: Request, + db: Session = Depends(get_db), +): + service = ExchangeService(request.app, db) + source = _resolve_source(payload, request, db) + plan = service.resolve_plan( + template_kind=payload.task_kind, + template_id=payload.template_id, + version_id=payload.version_id, + version=payload.version, + bindings=[ExchangeTemplateBinding(**item.model_dump()) for item in payload.bindings], + fields=[ExchangeField(**item.model_dump()) for item in payload.fields], + placeholders=[ExchangePlaceholder(**item.model_dump()) for item in payload.placeholders], + title=payload.title, + description=payload.description, + sheet_name=payload.sheet_name, + meta=payload.meta, + source=source, + ) + task = service.create_task( + template_id=plan.template_id, + version_id=plan.version_id, + version=plan.version, + task_kind=payload.task_kind, + storage_key=payload.storage_key, + input_payload=payload.input_payload, + meta=payload.meta, + ) + return ok(ExchangeTaskResponse.model_validate(_task_payload(task)).model_dump(mode="json")) + + +@router.get("/tasks/{task_id}/rows") +def list_task_rows(task_id: str, request: Request, db: Session = Depends(get_db)): + service = ExchangeService(request.app, db) + return ok( + [ + { + "id": item.id, + "task_id": item.task_id, + "row_index": item.row_index, + "status": item.status, + "data": item.data, + "message": item.message, + "result": item.result, + } + for item in service.list_task_rows(task_id) + ] + ) diff --git a/iti/exchange/schemas.py b/iti/exchange/schemas.py new file mode 100644 index 0000000..23e23ed --- /dev/null +++ b/iti/exchange/schemas.py @@ -0,0 +1,152 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from pydantic import AliasChoices, Field + +from iti.schemas import ItiModel + +from .base import ExchangeTemplateKind, ExchangeTemplateSourceKind, ExchangeTaskKind + + +class ExchangePlaceholderSchema(ItiModel): + key: str + label: str + description: str | None = None + required: bool = False + example: str | None = None + + +class ExchangeFieldSchema(ItiModel): + key: str + label: str + placeholder: str | None = None + required: bool = False + example: str | None = None + width: int | None = None + format: str | None = None + source: str | None = None + target: str | None = None + options: list[tuple[str, str]] = Field(default_factory=list) + meta: dict[str, Any] = Field(default_factory=dict) + + +class ExchangeTemplateBindingSchema(ItiModel): + entity: str + template_kind: ExchangeTemplateKind + handler: str | None = None + description: str | None = None + default_sheet_name: str | None = None + default_file_name: str | None = None + title: str | None = None + meta: dict[str, Any] = Field(default_factory=dict) + + +class ExchangeTemplateCreateRequest(ItiModel): + code: str + name: str + template_kind: ExchangeTemplateKind + entity: str + description: str | None = None + meta: dict[str, Any] = Field(default_factory=dict) + + +class ExchangeTemplateUpdateRequest(ItiModel): + name: str | None = None + description: str | None = None + status: str | None = None + current_version: str | None = None + meta: dict[str, Any] | None = None + + +class ExchangeTemplateVersionCreateRequest(ItiModel): + version: str + bindings: list[ExchangeTemplateBindingSchema] = Field(default_factory=list) + fields: list[ExchangeFieldSchema] = Field(default_factory=list) + placeholders: list[ExchangePlaceholderSchema] = Field(default_factory=list) + meta: dict[str, Any] = Field(default_factory=dict) + make_current: bool = True + + +class ExchangePlanRequest(ItiModel): + template_id: str | None = None + version_id: str | None = None + version: str | None = None + source_kind: ExchangeTemplateSourceKind | None = None + source_name: str | None = None + source_service: str | None = None + bindings: list[ExchangeTemplateBindingSchema] = Field(default_factory=list) + fields: list[ExchangeFieldSchema] = Field(default_factory=list) + placeholders: list[ExchangePlaceholderSchema] = Field(default_factory=list) + title: str | None = None + description: str | None = None + sheet_name: str | None = None + meta: dict[str, Any] = Field(default_factory=dict) + + +class ExchangePlanResolveRequest(ExchangePlanRequest): + template_kind: ExchangeTemplateKind = Field( + validation_alias=AliasChoices("templateKind", "taskKind") + ) + + +class ExchangePlanTemplateFileRequest(ExchangePlanResolveRequest): + pass + + +class ExchangeTaskCreateRequest(ExchangePlanRequest): + task_kind: ExchangeTaskKind + storage_key: str | None = None + input_payload: dict[str, Any] = Field(default_factory=dict) + + +class ExchangeTemplateResponse(ItiModel): + id: str + code: str + name: str + template_kind: str + entity: str + status: str + description: str | None = None + current_version: str | None = None + meta: dict[str, Any] = Field(default_factory=dict) + created_at: datetime + updated_at: datetime + + +class ExchangeTemplateVersionResponse(ItiModel): + id: str + template_id: str + version: str + template_kind: str + published_at: datetime | None = None + file_key: str | None = None + checksum: str | None = None + bindings: list[dict[str, Any]] = Field(default_factory=list) + fields: list[dict[str, Any]] = Field(default_factory=list) + placeholders: list[dict[str, Any]] = Field(default_factory=list) + meta: dict[str, Any] = Field(default_factory=dict) + created_at: datetime + updated_at: datetime + + +class ExchangeTaskResponse(ItiModel): + id: str + template_id: str | None = None + template_version_id: str | None = None + task_kind: str + status: str + requested_by: str | None = None + storage_key: str | None = None + success_count: int + failed_count: int + error_count: int + message: str | None = None + input_payload: dict[str, Any] = Field(default_factory=dict) + result_payload: dict[str, Any] = Field(default_factory=dict) + started_at: datetime | None = None + finished_at: datetime | None = None + meta: dict[str, Any] = Field(default_factory=dict) + created_at: datetime + updated_at: datetime diff --git a/iti/exchange/service.py b/iti/exchange/service.py new file mode 100644 index 0000000..f628ec9 --- /dev/null +++ b/iti/exchange/service.py @@ -0,0 +1,513 @@ +from __future__ import annotations + +import hashlib +from dataclasses import asdict +from datetime import datetime +from io import BytesIO +from enum import Enum +from typing import Any + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from iti.exceptions import BizError + +from .base import ( + ExchangeField, + ExchangePlaceholder, + ExchangeTemplateBinding, + ExchangeTemplateKind, + ExchangePlan, + ExchangeTemplateSnapshot, + ExchangeTaskKind, +) +from .excel import ExcelTemplateCodec, ExcelWorkbookCodec +from .models import ( + ExchangeTaskModel, + ExchangeTaskRowModel, + ExchangeTemplateModel, + ExchangeTemplateVersionModel, +) +from .tasks import get_exchange_storage + + +class ExchangeService: + def __init__(self, app, db: Session) -> None: + self.app = app + self.db = db + + def create_template( + self, + *, + code: str, + name: str, + template_kind: ExchangeTemplateKind | str, + entity: str, + description: str | None = None, + meta: dict[str, Any] | None = None, + ) -> ExchangeTemplateModel: + template = ExchangeTemplateModel( + code=code, + name=name, + template_kind=_enum_value(template_kind), + entity=entity, + description=description, + meta=meta or {}, + ) + self.db.add(template) + self.db.commit() + self.db.refresh(template) + return template + + def update_template( + self, + template_id: str, + *, + name: str | None = None, + description: str | None = None, + status: str | None = None, + current_version: str | None = None, + meta: dict[str, Any] | None = None, + ) -> ExchangeTemplateModel: + template = self.get_template_or_404(template_id) + if name is not None: + template.name = name + if description is not None: + template.description = description + if status is not None: + template.status = status + if current_version is not None: + template.current_version = current_version + if meta is not None: + template.meta = meta + self.db.commit() + self.db.refresh(template) + return template + + def publish_version( + self, + *, + template_id: str, + version: str, + bindings: list[ExchangeTemplateBinding] | None = None, + fields: list[ExchangeField] | None = None, + placeholders: list[ExchangePlaceholder] | None = None, + file_content: bytes | None = None, + file_name: str | None = None, + meta: dict[str, Any] | None = None, + make_current: bool = True, + ) -> ExchangeTemplateVersionModel: + template = self.get_template_or_404(template_id) + file_key = None + checksum = None + if file_content is not None: + file_key = self.save_template_file( + template=template, + version=version, + content=file_content, + file_name=file_name, + ) + checksum = hashlib.sha256(file_content).hexdigest() + + snapshot = ExchangeTemplateVersionModel( + template_id=template.id, + version=version, + template_kind=template.template_kind, + published_at=datetime.now(), + file_key=file_key, + checksum=checksum, + bindings=[_jsonable(asdict(item)) for item in bindings or []], + fields=[_jsonable(asdict(item)) for item in fields or []], + placeholders=[_jsonable(asdict(item)) for item in placeholders or []], + meta=meta or {}, + ) + self.db.add(snapshot) + if make_current: + template.current_version = version + template.status = "published" + self.db.commit() + self.db.refresh(snapshot) + return snapshot + + def build_template_file(self, version_id: str) -> bytes: + version = self.get_version_or_404(version_id) + if version.file_key: + storage = get_exchange_storage(self.app) + with storage.download(version.file_key) as file_stream: + return file_stream.read() + snapshot = self.snapshot_from_model(version) + return ExcelTemplateCodec().dump(snapshot) + + def build_plan_template_file(self, plan: ExchangePlan) -> bytes: + if plan.version_id: + version = self.get_snapshot_by_version_id(plan.version_id) + if version is not None and version.file_key: + storage = get_exchange_storage(self.app) + with storage.download(version.file_key) as file_stream: + return file_stream.read() + return ExcelTemplateCodec().dump(plan) + + def export_rows( + self, + rows: list[dict[str, Any]], + *, + plan: ExchangePlan | None = None, + fields: list[ExchangeField] | None = None, + sheet_name: str | None = None, + ) -> bytes: + workbook_codec = ExcelWorkbookCodec() + if plan is not None: + return workbook_codec.export_rows_with_plan( + plan=plan, + rows=rows, + sheet_name=sheet_name, + ) + if fields is not None: + return workbook_codec.export_rows_with_template( + fields=fields, + rows=rows, + sheet_name=sheet_name or "Export", + ) + if not rows: + return workbook_codec.export_rows([], [], sheet_name=sheet_name or "Export") + headers = list(rows[0].keys()) + return workbook_codec.export_rows(headers, rows, sheet_name=sheet_name or "Export") + + def import_rows( + self, + content: bytes, + *, + plan: ExchangePlan | None = None, + fields: list[ExchangeField] | None = None, + ) -> list[dict[str, Any]]: + workbook_codec = ExcelWorkbookCodec() + if plan is not None and plan.fields: + return workbook_codec.import_rows_with_fields(content, fields=list(plan.fields)) + if fields is not None: + return workbook_codec.import_rows_with_fields(content, fields=fields) + return workbook_codec.import_rows(content) + + def save_template_file( + self, + *, + template: ExchangeTemplateModel, + version: str, + content: bytes, + file_name: str | None = None, + ) -> str: + suffix = _safe_suffix(file_name or "template.xlsx") + key = f"exchange/templates/{template.code}/{version}/{hashlib.sha256(content).hexdigest()}.{suffix}" + storage = get_exchange_storage(self.app) + storage.upload(BytesIO(content), key, _excel_mime_type()) + return key + + def create_task( + self, + *, + template_id: str | None = None, + version_id: str | None = None, + version: str | None = None, + task_kind: ExchangeTaskKind | str, + requested_by: str | None = None, + storage_key: str | None = None, + input_payload: dict[str, Any] | None = None, + meta: dict[str, Any] | None = None, + ) -> ExchangeTaskModel: + template = self.get_template_or_404(template_id) if template_id else None + version_model = self.get_version_or_404(version_id) if version_id else None + if template is not None and version_model is not None and version_model.template_id != template.id: + raise BizError("模板版本不属于该模板", code=400) + if template is None and version_model is not None: + template = self.get_template_or_404(version_model.template_id) + if template is not None and version_model is None: + if version: + version_model = self.get_snapshot(template_id=template.id, version=version) + elif template.current_version: + version_model = self.get_snapshot( + template_id=template.id, + version=template.current_version, + ) + task = ExchangeTaskModel( + template_id=template.id if template is not None else None, + template_version_id=version_model.id if version_model is not None else None, + task_kind=_enum_value(task_kind), + status="pending", + requested_by=requested_by, + storage_key=storage_key, + input_payload=input_payload or {}, + meta=meta or {}, + ) + self.db.add(task) + self.db.commit() + self.db.refresh(task) + return task + + def get_snapshot(self, *, template_id: str, version: str) -> ExchangeTemplateSnapshot | None: + version_model = self.db.scalar( + select(ExchangeTemplateVersionModel) + .where(ExchangeTemplateVersionModel.template_id == template_id) + .where(ExchangeTemplateVersionModel.version == version) + ) + if version_model is None: + return None + return self.snapshot_from_model(version_model) + + def get_snapshot_by_version_id(self, version_id: str) -> ExchangeTemplateSnapshot | None: + version_model = self.db.get(ExchangeTemplateVersionModel, version_id) + if version_model is None: + return None + return self.snapshot_from_model(version_model) + + def get_current_snapshot(self, template_id: str) -> ExchangeTemplateSnapshot | None: + template = self.db.get(ExchangeTemplateModel, template_id) + if template is None or not template.current_version: + return None + return self.get_snapshot(template_id=template_id, version=template.current_version) + + def resolve_plan( + self, + *, + template_kind: ExchangeTemplateKind | str, + template_id: str | None = None, + version_id: str | None = None, + version: str | None = None, + bindings: list[ExchangeTemplateBinding] | None = None, + fields: list[ExchangeField] | None = None, + placeholders: list[ExchangePlaceholder] | None = None, + title: str | None = None, + description: str | None = None, + sheet_name: str | None = None, + meta: dict[str, Any] | None = None, + source: Any | None = None, + ) -> ExchangePlan: + if source is not None: + return source.resolve_plan( + template_kind=template_kind, + template_id=template_id, + version_id=version_id, + version=version, + bindings=bindings, + fields=fields, + placeholders=placeholders, + title=title, + description=description, + sheet_name=sheet_name, + meta=meta, + ) + if version_id: + snapshot = self.get_snapshot_by_version_id(version_id) + if snapshot is not None: + return snapshot.to_plan() + if template_id and version: + snapshot = self.get_snapshot(template_id=template_id, version=version) + if snapshot is not None: + return snapshot.to_plan() + if template_id: + current = self.get_current_snapshot(template_id) + if current is not None: + return current.to_plan() + return ExchangePlan.from_mapping( + template_kind=template_kind, + template_id=template_id, + version_id=version_id, + version=version, + bindings=bindings, + fields=fields, + placeholders=placeholders, + title=title, + description=description, + sheet_name=sheet_name, + meta=meta, + ) + + def mark_task_running(self, task_id: str) -> ExchangeTaskModel: + task = self.get_task_or_404(task_id) + task.status = "running" + task.started_at = datetime.now() + self.db.commit() + self.db.refresh(task) + return task + + def mark_task_finished( + self, + task_id: str, + *, + status: str = "success", + message: str | None = None, + result_payload: dict[str, Any] | None = None, + success_count: int | None = None, + failed_count: int | None = None, + ) -> ExchangeTaskModel: + task = self.get_task_or_404(task_id) + task.status = status + task.message = message + task.finished_at = datetime.now() + if result_payload is not None: + task.result_payload = result_payload + if success_count is not None: + task.success_count = success_count + if failed_count is not None: + task.failed_count = failed_count + task.error_count = failed_count + self.db.commit() + self.db.refresh(task) + return task + + def add_task_row( + self, + *, + task_id: str, + row_index: int, + status: str, + data: dict[str, Any] | None = None, + message: str | None = None, + result: dict[str, Any] | None = None, + ) -> ExchangeTaskRowModel: + row = ExchangeTaskRowModel( + task_id=task_id, + row_index=row_index, + status=status, + data=data or {}, + message=message, + result=result or {}, + ) + self.db.add(row) + self.db.commit() + self.db.refresh(row) + return row + + def get_template_or_404(self, template_id: str) -> ExchangeTemplateModel: + template = self.db.get(ExchangeTemplateModel, template_id) + if template is None: + raise BizError("模板不存在", code=404) + return template + + def get_version_or_404(self, version_id: str) -> ExchangeTemplateVersionModel: + version = self.db.get(ExchangeTemplateVersionModel, version_id) + if version is None: + raise BizError("模板版本不存在", code=404) + return version + + def get_task_or_404(self, task_id: str) -> ExchangeTaskModel: + task = self.db.get(ExchangeTaskModel, task_id) + if task is None: + raise BizError("导入导出任务不存在", code=404) + return task + + def list_templates(self) -> list[ExchangeTemplateModel]: + return list( + self.db.scalars( + select(ExchangeTemplateModel).order_by( + ExchangeTemplateModel.entity, + ExchangeTemplateModel.code, + ) + ) + ) + + def list_versions(self, template_id: str) -> list[ExchangeTemplateVersionModel]: + return list( + self.db.scalars( + select(ExchangeTemplateVersionModel) + .where(ExchangeTemplateVersionModel.template_id == template_id) + .order_by(ExchangeTemplateVersionModel.version) + ) + ) + + def list_tasks(self, template_id: str | None = None) -> list[ExchangeTaskModel]: + statement = select(ExchangeTaskModel).order_by(ExchangeTaskModel.created_at.desc()) + if template_id is not None: + statement = statement.where(ExchangeTaskModel.template_id == template_id) + return list(self.db.scalars(statement)) + + def list_task_rows(self, task_id: str) -> list[ExchangeTaskRowModel]: + return list( + self.db.scalars( + select(ExchangeTaskRowModel) + .where(ExchangeTaskRowModel.task_id == task_id) + .order_by(ExchangeTaskRowModel.row_index) + ) + ) + + def snapshot_from_model( + self, version: ExchangeTemplateVersionModel + ) -> ExchangeTemplateSnapshot: + return ExchangeTemplateSnapshot( + id=version.id, + version=version.version, + template_id=version.template_id, + template_kind=ExchangeTemplateKind(version.template_kind), + bindings=tuple(_binding_from_dict(item) for item in version.bindings), + published_at=version.published_at.isoformat() if version.published_at else None, + file_key=version.file_key, + checksum=version.checksum, + fields=tuple(_field_from_dict(item) for item in version.fields), + placeholders=tuple(_placeholder_from_dict(item) for item in version.placeholders), + meta=version.meta, + ) + + +def _field_from_dict(value: dict[str, Any]) -> ExchangeField: + options = value.get("options") or () + return ExchangeField( + key=value["key"], + label=value["label"], + placeholder=value.get("placeholder"), + required=bool(value.get("required", False)), + example=value.get("example"), + width=value.get("width"), + format=value.get("format"), + source=value.get("source"), + target=value.get("target"), + options=tuple(tuple(item) for item in options), + meta=value.get("meta") or {}, + ) + + +def _placeholder_from_dict(value: dict[str, Any]) -> ExchangePlaceholder: + return ExchangePlaceholder( + key=value["key"], + label=value["label"], + description=value.get("description"), + required=bool(value.get("required", False)), + example=value.get("example"), + ) + + +def _binding_from_dict(value: dict[str, Any]) -> ExchangeTemplateBinding: + return ExchangeTemplateBinding( + entity=value["entity"], + template_kind=ExchangeTemplateKind(value["template_kind"]), + handler=value.get("handler"), + description=value.get("description"), + default_sheet_name=value.get("default_sheet_name"), + default_file_name=value.get("default_file_name"), + title=value.get("title"), + meta=value.get("meta") or {}, + ) + + +def _enum_value(value: Any) -> str: + return value.value if hasattr(value, "value") else str(value) + + +def _safe_suffix(file_name: str) -> str: + if "." not in file_name: + return "xlsx" + suffix = file_name.rsplit(".", 1)[-1].lower() + return "".join(ch for ch in suffix if ch.isalnum()) or "xlsx" + + +def _excel_mime_type() -> str: + return "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + + +def _jsonable(value: Any) -> Any: + if isinstance(value, Enum): + return value.value + if isinstance(value, dict): + return {key: _jsonable(item) for key, item in value.items()} + if isinstance(value, list): + return [_jsonable(item) for item in value] + if isinstance(value, tuple): + return [_jsonable(item) for item in value] + return value diff --git a/iti/exchange/sources.py b/iti/exchange/sources.py new file mode 100644 index 0000000..348a7ad --- /dev/null +++ b/iti/exchange/sources.py @@ -0,0 +1,286 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Protocol + +from iti.service_client import ServiceClient, service_client + +from .base import ( + ExchangeField, + ExchangePlaceholder, + ExchangePlan, + ExchangeTemplateBinding, + ExchangeTemplateKind, + ExchangeTemplateSource, + ExchangeTemplateSourceKind, +) +from .excel import ExcelTemplateCodec + + +class ExchangeSource(Protocol): + def resolve_plan( + self, + *, + template_kind: ExchangeTemplateKind | str, + template_id: str | None = None, + version_id: str | None = None, + version: str | None = None, + bindings: list[ExchangeTemplateBinding] | None = None, + fields: list[ExchangeField] | None = None, + placeholders: list[ExchangePlaceholder] | None = None, + title: str | None = None, + description: str | None = None, + sheet_name: str | None = None, + meta: dict[str, Any] | None = None, + source: ExchangeTemplateSource | None = None, + ) -> ExchangePlan: + ... + + def load_template_file(self, plan: ExchangePlan) -> bytes | None: + ... + + +@dataclass +class MappingExchangeSource: + def resolve_plan( + self, + *, + template_kind: ExchangeTemplateKind | str, + template_id: str | None = None, + version_id: str | None = None, + version: str | None = None, + bindings: list[ExchangeTemplateBinding] | None = None, + fields: list[ExchangeField] | None = None, + placeholders: list[ExchangePlaceholder] | None = None, + title: str | None = None, + description: str | None = None, + sheet_name: str | None = None, + meta: dict[str, Any] | None = None, + source: ExchangeTemplateSource | None = None, + ) -> ExchangePlan: + if source is not None: + return source.to_plan() + return ExchangePlan.from_mapping( + template_kind=template_kind, + template_id=template_id, + version_id=version_id, + version=version, + bindings=bindings, + fields=fields, + placeholders=placeholders, + title=title, + description=description, + sheet_name=sheet_name, + meta=meta, + ) + + def load_template_file(self, plan: ExchangePlan) -> bytes | None: + return ExcelTemplateCodec().dump(plan) + + +@dataclass +class LocalExchangeSource: + app: Any + db: Any + + def resolve_plan( + self, + *, + template_kind: ExchangeTemplateKind | str, + template_id: str | None = None, + version_id: str | None = None, + version: str | None = None, + bindings: list[ExchangeTemplateBinding] | None = None, + fields: list[ExchangeField] | None = None, + placeholders: list[ExchangePlaceholder] | None = None, + title: str | None = None, + description: str | None = None, + sheet_name: str | None = None, + meta: dict[str, Any] | None = None, + source: ExchangeTemplateSource | None = None, + ) -> ExchangePlan: + from .service import ExchangeService + + service = ExchangeService(self.app, self.db) + if source is not None: + return source.to_plan() + if version_id: + snapshot = service.get_snapshot_by_version_id(version_id) + if snapshot is not None: + return snapshot.to_plan() + if template_id: + snapshot = service.get_current_snapshot(template_id) + if snapshot is not None: + return snapshot.to_plan() + return ExchangePlan.from_mapping( + template_kind=template_kind, + template_id=template_id, + version_id=version_id, + version=version, + bindings=bindings, + fields=fields, + placeholders=placeholders, + title=title, + description=description, + sheet_name=sheet_name, + meta=meta, + ) + + def load_template_file(self, plan: ExchangePlan) -> bytes | None: + if plan.version_id: + from .service import ExchangeService + + return ExchangeService(self.app, self.db).build_template_file(plan.version_id) + return ExcelTemplateCodec().dump(plan) + + +@dataclass +class RemoteExchangeSource: + app: Any + service_name: str = "exchange" + + def resolve_plan( + self, + *, + template_kind: ExchangeTemplateKind | str, + template_id: str | None = None, + version_id: str | None = None, + version: str | None = None, + bindings: list[ExchangeTemplateBinding] | None = None, + fields: list[ExchangeField] | None = None, + placeholders: list[ExchangePlaceholder] | None = None, + title: str | None = None, + description: str | None = None, + sheet_name: str | None = None, + meta: dict[str, Any] | None = None, + source: ExchangeTemplateSource | None = None, + ) -> ExchangePlan: + if source is not None: + return source.to_plan() + client = service_client(self.app, self.service_name) + payload = self._fetch_plan(client, template_id=template_id, version=version, version_id=version_id) + if payload is not None: + meta = payload.get("meta") or {} + return ExchangePlan.from_mapping( + template_kind=payload.get("template_kind") or template_kind, + template_id=payload.get("template_id") or template_id, + version_id=payload.get("id") or version_id, + version=payload.get("version") or version, + bindings=[_binding_from_mapping(item) for item in payload.get("bindings", [])], + fields=[_field_from_mapping(item) for item in payload.get("fields", [])], + placeholders=[ + _placeholder_from_mapping(item) for item in payload.get("placeholders", []) + ], + title=payload.get("title") or meta.get("title") or title, + description=payload.get("description") or meta.get("description") or description, + sheet_name=payload.get("sheet_name") or meta.get("sheet_name") or sheet_name, + meta=meta or payload.get("meta") or {}, + ) + return ExchangePlan.from_mapping( + template_kind=template_kind, + template_id=template_id, + version_id=version_id, + version=version, + bindings=bindings, + fields=fields, + placeholders=placeholders, + title=title, + description=description, + sheet_name=sheet_name, + meta=meta, + ) + + def load_template_file(self, plan: ExchangePlan) -> bytes | None: + if not plan.version_id: + return ExcelTemplateCodec().dump(plan) + client = service_client(self.app, self.service_name) + response = client.get( + f"/exchange/template-versions/{plan.version_id}/download", + expect_json=False, + ) + return response.content + + def _fetch_plan( + self, + client: ServiceClient, + *, + template_id: str | None, + version: str | None, + version_id: str | None, + ) -> dict[str, Any] | None: + if version_id: + return client.get(f"/exchange/template-versions/{version_id}") + if template_id and version: + return client.get(f"/exchange/templates/{template_id}/versions/{version}") + if template_id: + template = client.get(f"/exchange/templates/{template_id}") + current_version = (template or {}).get("current_version") + if current_version: + return client.get(f"/exchange/templates/{template_id}/versions/{current_version}") + return None + + +def _binding_from_mapping(item: dict[str, Any]) -> ExchangeTemplateBinding: + return ExchangeTemplateBinding( + entity=item.get("entity"), + template_kind=item.get("template_kind") or item.get("templateKind"), + handler=item.get("handler"), + description=item.get("description"), + default_sheet_name=item.get("default_sheet_name") or item.get("defaultSheetName"), + default_file_name=item.get("default_file_name") or item.get("defaultFileName"), + title=item.get("title"), + meta=item.get("meta") or {}, + ) + + +def _field_from_mapping(item: dict[str, Any]) -> ExchangeField: + return ExchangeField( + key=item.get("key"), + label=item.get("label"), + placeholder=item.get("placeholder"), + required=bool(item.get("required", False)), + example=item.get("example"), + width=item.get("width"), + format=item.get("format"), + source=item.get("source"), + target=item.get("target"), + options=tuple(tuple(option) for option in item.get("options") or []), + meta=item.get("meta") or {}, + ) + + +def _placeholder_from_mapping(item: dict[str, Any]) -> ExchangePlaceholder: + return ExchangePlaceholder( + key=item.get("key"), + label=item.get("label"), + description=item.get("description"), + required=bool(item.get("required", False)), + example=item.get("example"), + ) + + +def get_exchange_source( + app: Any, + *, + source_kind: ExchangeTemplateSourceKind | str = ExchangeTemplateSourceKind.MAPPING, + source_name: str | None = None, + db: Any | None = None, + service_name: str = "exchange", +) -> ExchangeSource: + if source_name: + from .registry import get_exchange_registry + + source = get_exchange_registry(app).get_source(source_name) + if source is None: + raise ValueError(f"exchange source not registered: {source_name}") + return source # type: ignore[return-value] + kind = ExchangeTemplateSourceKind(source_kind) + if kind == ExchangeTemplateSourceKind.LOCAL: + if db is None: + raise ValueError("local exchange source requires db") + return LocalExchangeSource(app=app, db=db) + if kind == ExchangeTemplateSourceKind.REMOTE: + return RemoteExchangeSource(app=app, service_name=service_name) + if kind == ExchangeTemplateSourceKind.CUSTOM: + raise ValueError("custom exchange source requires source_name") + return MappingExchangeSource() diff --git a/iti/exchange/tasks.py b/iti/exchange/tasks.py new file mode 100644 index 0000000..03f83c1 --- /dev/null +++ b/iti/exchange/tasks.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from iti.storage import StorageManager +from iti.tasks import task_registry + +from .base import ExchangeTaskKind + + +@dataclass(frozen=True) +class ExchangeTaskContext: + task_kind: ExchangeTaskKind + template_id: str + version: str + storage_key: str | None = None + payload: dict[str, Any] | None = None + + +@dataclass(frozen=True) +class ExchangeTaskResult: + success_count: int = 0 + failed_count: int = 0 + message: str | None = None + result_payload: dict[str, Any] | None = None + + +def register_exchange_task( + *, + name: str, + handler, + schedule: str | None = None, + description: str | None = None, +): + return task_registry.register( + name=name, + handler=handler, + schedule=schedule, + description=description, + ) + + +def wrap_exchange_task(handler): + def runner(context: ExchangeTaskContext): + return handler(context) + + return runner + + +def get_exchange_storage(app): + base_config = dict(getattr(app.state.config, "file_storage", {}) or {}) + exchange_config = dict(getattr(app.state.config, "exchange_storage", {}) or {}) + base_config.update(exchange_config) + default_storage = getattr(app.state.config, "exchange_default_storage", None) + if default_storage and "DEFAULT_STORAGE_TYPE" not in exchange_config: + base_config["DEFAULT_STORAGE_TYPE"] = default_storage + base_dir = getattr(app.state.config, "base_dir", None) + return StorageManager.get_storage(config=base_config, base_dir=base_dir) diff --git a/migrations/env.py b/migrations/env.py index 9c15c99..b9289ba 100644 --- a/migrations/env.py +++ b/migrations/env.py @@ -8,6 +8,7 @@ from sqlalchemy import engine_from_config, pool from iti.config import get_config from iti.db import Base +from iti.exchange import models as _exchange_models config = context.config diff --git a/tests/test_exchange.py b/tests/test_exchange.py new file mode 100644 index 0000000..3f89183 --- /dev/null +++ b/tests/test_exchange.py @@ -0,0 +1,360 @@ +from __future__ import annotations + +from io import BytesIO +from dataclasses import dataclass + +import httpx +from fastapi.testclient import TestClient + +from iti import create_app +from iti.config import BaseConfig +from iti.db import Base, reset_db +from iti.exchange import ( + ExchangeField, + ExchangePlaceholder, + ExchangePlan, + ExchangeTemplateBinding, + ExchangeTemplateKind, + ExchangeTemplateSource, + ExchangeTemplateSourceKind, + LocalExchangeSource, + MappingExchangeSource, + RemoteExchangeSource, + get_exchange_registry, + register_exchange_source, +) +from iti.exchange.excel import ExcelTemplateCodec, ExcelWorkbookCodec +from iti.exchange.service import ExchangeService +from iti.exchange.base import ExchangeTemplateSnapshot +from iti.service_client import register_service_client + + +def make_app(*, exchange_enabled: bool = True): + config = BaseConfig( + database_url="sqlite+pysqlite:///:memory:", + testing=True, + exchange_enabled=exchange_enabled, + ) + app = create_app(config_mapping=config) + Base.metadata.create_all(app.state.db_engine) + return app + + +def test_exchange_module_is_auto_registered(): + app = make_app() + + assert app.state.iti_modules.get("exchange") is not None + assert "exchange:template:list" in app.state.iti_modules.permissions + + +def test_template_version_workbook_roundtrip(): + snapshot = ExchangeTemplateSnapshot( + id="v1", + version="1.0.0", + template_id="tpl1", + template_kind="import", + bindings=( + ExchangeTemplateBinding(entity="order", template_kind="import", title="订单"), + ), + fields=( + ExchangeField(key="name", label="名称", placeholder="{{name}}", source="name"), + ), + placeholders=( + ExchangePlaceholder(key="tenant", label="租户", example="demo"), + ), + meta={"title": "订单模板", "sheet_name": "模板"}, + ) + codec = ExcelTemplateCodec() + content = codec.dump(snapshot) + parsed = codec.load(content) + + assert parsed["title"] == "订单模板" + assert parsed["sheet_name"] == "模板" + assert parsed["bindings"][0]["entity"] == "order" + assert parsed["fields"][0]["key"] == "name" + assert parsed["placeholders"][0]["key"] == "tenant" + + +def test_exchange_service_create_publish_and_task_flow(): + reset_db() + app = make_app() + service = ExchangeService(app, app.state.db_sessionmaker()) + + template = service.create_template( + code="order", + name="订单", + template_kind="import", + entity="order", + ) + assert template.code == "order" + + version = service.publish_version( + template_id=template.id, + version="1.0.0", + bindings=[ + ExchangeTemplateBinding(entity="order", template_kind="import", title="订单") + ], + fields=[ExchangeField(key="name", label="名称", source="name")], + placeholders=[ExchangePlaceholder(key="tenant", label="租户")], + ) + assert version.version == "1.0.0" + + task = service.create_task( + template_id=template.id, + version_id=version.id, + task_kind="import", + input_payload={"source": "upload"}, + ) + assert task.task_kind == "import" + assert task.template_version_id == version.id + + workbook = ExcelWorkbookCodec().export_rows_with_template( + fields=[ExchangeField(key="name", label="名称", source="name")], + rows=[{"name": "A"}], + sheet_name="导出", + ) + rows = ExcelWorkbookCodec().import_rows(workbook) + assert rows[0]["名称"] == "A" + + +def test_mapping_source_supports_template_less_workbook_roundtrip(): + source = MappingExchangeSource() + plan = source.resolve_plan( + template_kind="import", + fields=[ + ExchangeField(key="name", label="名称", placeholder="{{name}}", source="name"), + ExchangeField(key="age", label="年龄", source="age"), + ], + placeholders=[ExchangePlaceholder(key="tenant", label="租户")], + title="映射模板", + sheet_name="映射", + ) + + codec = ExcelTemplateCodec() + workbook = codec.dump(plan) + parsed = codec.load(workbook) + + assert parsed["title"] == "映射模板" + assert parsed["sheet_name"] == "映射" + assert parsed["fields"][0]["source"] == "name" + rows = ExcelWorkbookCodec().export_rows_with_plan( + plan=plan, + rows=[{"name": "Alice", "age": 18}], + ) + imported = ExcelWorkbookCodec().import_rows_with_fields(rows, fields=list(plan.fields)) + assert imported[0]["name"] == "Alice" + assert imported[0]["age"] == 18 + + +def test_excel_workbook_codec_roundtrip_uses_pandas_and_preserves_empty_cells(): + codec = ExcelWorkbookCodec() + content = codec.export_rows( + headers=["名称", "年龄"], + rows=[{"名称": "Alice", "年龄": None}], + sheet_name="导出", + ) + + rows = codec.import_rows(content) + + assert rows == [{"名称": "Alice", "年龄": None}] + + +def test_excel_workbook_codec_with_fields_maps_headers_to_target_keys(): + codec = ExcelWorkbookCodec() + fields = [ + ExchangeField(key="name", label="名称", source="name"), + ExchangeField(key="age", label="年龄", target="age_import"), + ] + content = codec.export_rows_with_template( + fields=fields, + rows=[{"name": "Alice", "age": 18}], + sheet_name="映射", + ) + + rows = codec.import_rows_with_fields(content, fields=fields) + + assert rows == [{"name": "Alice", "age_import": 18}] + + +def test_remote_source_resolves_plan_and_template_bytes(): + app = create_app(config_mapping=BaseConfig(database_url="sqlite+pysqlite:///:memory:", testing=True)) + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/exchange/template-versions/v1": + return httpx.Response( + 200, + json={ + "data": { + "id": "v1", + "template_id": "tpl1", + "version": "1.0.0", + "template_kind": "import", + "bindings": [ + {"entity": "order", "template_kind": "import", "title": "订单"} + ], + "fields": [ + {"key": "name", "label": "名称", "source": "name"} + ], + "placeholders": [ + {"key": "tenant", "label": "租户"} + ], + "meta": {"title": "订单模板", "sheet_name": "模板"}, + }, + "code": 200, + "message": "成功", + }, + ) + if request.url.path == "/exchange/template-versions/v1/download": + content = ExcelTemplateCodec().dump( + ExchangePlan.from_mapping( + template_kind=ExchangeTemplateKind.IMPORT, + template_id="tpl1", + version_id="v1", + version="1.0.0", + fields=[ExchangeField(key="name", label="名称", source="name")], + title="订单模板", + sheet_name="模板", + ) + ) + return httpx.Response( + 200, + content=content, + headers={ + "Content-Type": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + }, + ) + raise AssertionError(f"unexpected path: {request.url.path}") + + register_service_client( + app, + "template_center", + {"base_url": "https://template-center.local"}, + transport=httpx.MockTransport(handler), + ) + source = RemoteExchangeSource(app, service_name="template_center") + + plan = source.resolve_plan(template_kind="import", version_id="v1") + assert plan.template_id == "tpl1" + assert plan.title == "订单模板" + assert plan.fields[0].source == "name" + + content = source.load_template_file(plan) + assert content is not None + assert len(content) > 0 + + +def test_custom_registered_source_can_drive_plan_and_file(): + @dataclass + class CustomSource: + plan: ExchangePlan + content: bytes + + def resolve_plan(self, **kwargs): + return self.plan + + def load_template_file(self, plan: ExchangePlan) -> bytes | None: + return self.content + + app = make_app() + custom_plan = ExchangePlan.from_mapping( + template_kind="import", + template_id="tpl-custom", + version_id="v-custom", + version="1.0.0", + fields=[ExchangeField(key="name", label="名称", source="name")], + title="自定义模板", + ) + register_exchange_source( + app, + "custom-center", + CustomSource(plan=custom_plan, content=b"custom-template"), + ) + + assert get_exchange_registry(app).get_source("custom-center") is not None + + client = TestClient(app) + response = client.post( + "/exchange/plans/resolve", + json={ + "templateKind": "import", + "sourceName": "custom-center", + }, + ) + assert response.status_code == 200 + assert response.json()["data"]["template_id"] == "tpl-custom" + + file_resp = client.post( + "/exchange/plans/template-file", + json={ + "templateKind": "import", + "sourceName": "custom-center", + }, + ) + assert file_resp.status_code == 200 + assert file_resp.content == b"custom-template" + + +def test_exchange_routes_are_available(): + app = make_app() + client = TestClient(app) + + created = client.post( + "/exchange/templates", + json={ + "code": "order", + "name": "订单", + "template_kind": "import", + "entity": "order", + }, + ) + assert created.status_code == 200 + template_id = created.json()["data"]["id"] + + version = client.post( + f"/exchange/templates/{template_id}/versions", + json={ + "version": "1.0.0", + "bindings": [], + "fields": [], + "placeholders": [], + }, + ) + assert version.status_code == 200 + + listed = client.get("/exchange/templates") + assert listed.status_code == 200 + assert listed.json()["data"][0]["code"] == "order" + + +def test_exchange_plan_routes_support_mapping_source(): + app = make_app() + client = TestClient(app) + + response = client.post( + "/exchange/plans/resolve", + json={ + "taskKind": "import", + "sourceKind": "mapping", + "fields": [ + {"key": "name", "label": "名称", "source": "name"}, + ], + "title": "映射", + "sheetName": "映射", + }, + ) + assert response.status_code == 200 + assert response.json()["data"]["title"] == "映射" + + file_resp = client.post( + "/exchange/plans/template-file", + json={ + "taskKind": "import", + "sourceKind": "mapping", + "fields": [ + {"key": "name", "label": "名称", "source": "name"}, + ], + "title": "映射", + "sheetName": "映射", + }, + ) + assert file_resp.status_code == 200