from __future__ import annotations from dataclasses import asdict from fastapi import APIRouter, Depends, File, Request, UploadFile from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session from iti.db import get_db from iti.exceptions import BizError from iti.responses import ok, raw_response from .base import ExchangeField, ExchangePlaceholder, ExchangeTemplateBinding, ExchangePlan from .schemas import ( ExchangeFieldSchema, ExchangePlanResolveRequest, ExchangePlanTemplateFileRequest, ExchangePlaceholderSchema, ExchangeTemplateBindingSchema, ExchangeTemplateSourceKind, ExchangeTaskCreateRequest, ExchangeTaskResponse, ExchangeTemplateCreateRequest, ExchangeTemplateResponse, ExchangeTemplateVersionCreateRequest, ExchangeTemplateVersionResponse, ExchangeTemplateUpdateRequest, ) from .service import ExchangeService from .excel import ExcelTemplateCodec from .sources import get_exchange_source router = APIRouter(prefix="/exchange", tags=["exchange"]) def _template_payload(item): return { "id": item.id, "code": item.code, "name": item.name, "template_kind": item.template_kind, "entity": item.entity, "status": item.status, "description": item.description, "current_version": item.current_version, "meta": item.meta, "created_at": item.created_at, "updated_at": item.updated_at, } def _version_payload(item): return { "id": item.id, "template_id": item.template_id, "version": item.version, "template_kind": item.template_kind, "published_at": item.published_at, "file_key": item.file_key, "checksum": item.checksum, "bindings": item.bindings, "fields": item.fields, "placeholders": item.placeholders, "meta": item.meta, "created_at": item.created_at, "updated_at": item.updated_at, } def _task_payload(item): return { "id": item.id, "template_id": item.template_id, "template_version_id": item.template_version_id, "task_kind": item.task_kind, "status": item.status, "requested_by": item.requested_by, "storage_key": item.storage_key, "success_count": item.success_count, "failed_count": item.failed_count, "error_count": item.error_count, "message": item.message, "input_payload": item.input_payload, "result_payload": item.result_payload, "started_at": item.started_at, "finished_at": item.finished_at, "meta": item.meta, "created_at": item.created_at, "updated_at": item.updated_at, } def _plan_payload(plan: ExchangePlan): return { "template_kind": plan.template_kind, "template_id": plan.template_id, "version_id": plan.version_id, "version": plan.version, "bindings": _plan_schema_items(plan.bindings), "fields": _plan_schema_items(plan.fields), "placeholders": _plan_schema_items(plan.placeholders), "title": plan.title, "description": plan.description, "sheet_name": plan.sheet_name, "meta": plan.meta, } def _plan_schema_items(items): return [asdict(item) for item in items] def _binding_from_payload(item): return ExchangeTemplateBinding( entity=item.get("entity"), template_kind=item.get("template_kind"), handler=item.get("handler"), description=item.get("description"), default_sheet_name=item.get("default_sheet_name"), default_file_name=item.get("default_file_name"), title=item.get("title"), meta=item.get("meta") or {}, ) def _field_from_payload(item): return ExchangeField( key=item.get("key"), label=item.get("label"), placeholder=item.get("placeholder"), required=bool(item.get("required", False)), example=item.get("example"), width=item.get("width"), format=item.get("format"), source=item.get("source"), target=item.get("target"), options=tuple(tuple(option) for option in item.get("options") or []), meta=item.get("meta") or {}, ) def _placeholder_from_payload(item): return ExchangePlaceholder( key=item.get("key"), label=item.get("label"), description=item.get("description"), required=bool(item.get("required", False)), example=item.get("example"), ) def _resolve_source(payload, request: Request, db: Session): source_kind = payload.source_kind source_name = getattr(payload, "source_name", None) if source_name: return get_exchange_source( request.app, source_name=source_name, db=db, service_name=getattr(payload, "source_service", "exchange"), ) if source_kind == ExchangeTemplateSourceKind.LOCAL: return get_exchange_source(request.app, source_kind=source_kind, db=db) if source_kind == ExchangeTemplateSourceKind.REMOTE: return get_exchange_source( request.app, source_kind=source_kind, service_name=payload.source_service or "exchange", ) if source_kind == ExchangeTemplateSourceKind.MAPPING or source_kind is None: return get_exchange_source(request.app, source_kind=ExchangeTemplateSourceKind.MAPPING) return get_exchange_source(request.app, source_kind=source_kind, db=db) @router.get("/templates") def list_templates(request: Request, db: Session = Depends(get_db)): service = ExchangeService(request.app, db) return ok( [ ExchangeTemplateResponse.model_validate(_template_payload(item)).model_dump(mode="json") for item in service.list_templates() ] ) @router.get("/templates/{template_id}") def get_template(template_id: str, request: Request, db: Session = Depends(get_db)): service = ExchangeService(request.app, db) template = service.get_template_or_404(template_id) return ok(ExchangeTemplateResponse.model_validate(_template_payload(template)).model_dump(mode="json")) @router.post("/templates") def create_template( payload: ExchangeTemplateCreateRequest, request: Request, db: Session = Depends(get_db), ): service = ExchangeService(request.app, db) template = service.create_template( code=payload.code, name=payload.name, template_kind=payload.template_kind, entity=payload.entity, description=payload.description, meta=payload.meta, ) return ok(ExchangeTemplateResponse.model_validate(_template_payload(template)).model_dump(mode="json")) @router.patch("/templates/{template_id}") def update_template( template_id: str, payload: ExchangeTemplateUpdateRequest, request: Request, db: Session = Depends(get_db), ): service = ExchangeService(request.app, db) template = service.update_template( template_id, name=payload.name, description=payload.description, status=payload.status, current_version=payload.current_version, meta=payload.meta, ) return ok(ExchangeTemplateResponse.model_validate(_template_payload(template)).model_dump(mode="json")) @router.get("/templates/{template_id}/versions") def list_versions( template_id: str, request: Request, db: Session = Depends(get_db), ): service = ExchangeService(request.app, db) return ok( [ ExchangeTemplateVersionResponse.model_validate(_version_payload(item)).model_dump(mode="json") for item in service.list_versions(template_id) ] ) @router.get("/templates/{template_id}/versions/{version_id}") def get_version( template_id: str, version_id: str, request: Request, db: Session = Depends(get_db), ): service = ExchangeService(request.app, db) version = service.get_version_or_404(version_id) if version.template_id != template_id: raise BizError("模板版本不存在", code=404) return ok(ExchangeTemplateVersionResponse.model_validate(_version_payload(version)).model_dump(mode="json")) @router.get("/tasks") def list_tasks(request: Request, db: Session = Depends(get_db)): service = ExchangeService(request.app, db) return ok( [ ExchangeTaskResponse.model_validate(_task_payload(item)).model_dump(mode="json") for item in service.list_tasks() ] ) @router.get("/tasks/{task_id}") def get_task(task_id: str, request: Request, db: Session = Depends(get_db)): service = ExchangeService(request.app, db) task = service.get_task_or_404(task_id) return ok(ExchangeTaskResponse.model_validate(_task_payload(task)).model_dump(mode="json")) @router.post("/templates/{template_id}/versions") def publish_version( template_id: str, payload: ExchangeTemplateVersionCreateRequest, request: Request, db: Session = Depends(get_db), ): service = ExchangeService(request.app, db) version = service.publish_version( template_id=template_id, version=payload.version, bindings=[ ExchangeTemplateBinding(**item.model_dump()) for item in payload.bindings ], fields=[ExchangeField(**item.model_dump()) for item in payload.fields], placeholders=[ ExchangePlaceholder(**item.model_dump()) for item in payload.placeholders ], meta=payload.meta, make_current=payload.make_current, ) return ok(ExchangeTemplateVersionResponse.model_validate(_version_payload(version)).model_dump(mode="json")) @router.get("/template-versions/{version_id}/download") @raw_response def download_template_version( version_id: str, request: Request, db: Session = Depends(get_db), ): service = ExchangeService(request.app, db) content = service.build_template_file(version_id) return StreamingResponse( iter([content]), media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": 'attachment; filename="template.xlsx"'}, ) @router.post("/plans/resolve") def resolve_plan( payload: ExchangePlanResolveRequest, request: Request, db: Session = Depends(get_db), ): service = ExchangeService(request.app, db) source = _resolve_source(payload, request, db) plan = service.resolve_plan( template_kind=payload.template_kind, template_id=payload.template_id, version_id=payload.version_id, version=payload.version, bindings=[ExchangeTemplateBinding(**item.model_dump()) for item in payload.bindings], fields=[ExchangeField(**item.model_dump()) for item in payload.fields], placeholders=[ExchangePlaceholder(**item.model_dump()) for item in payload.placeholders], title=payload.title, description=payload.description, sheet_name=payload.sheet_name, meta=payload.meta, source=source, ) return ok(_plan_payload(plan)) @router.post("/plans/template-file") @raw_response def build_plan_template_file( payload: ExchangePlanTemplateFileRequest, request: Request, db: Session = Depends(get_db), ): service = ExchangeService(request.app, db) source = _resolve_source(payload, request, db) plan = service.resolve_plan( template_kind=payload.template_kind, template_id=payload.template_id, version_id=payload.version_id, version=payload.version, bindings=[ExchangeTemplateBinding(**item.model_dump()) for item in payload.bindings], fields=[ExchangeField(**item.model_dump()) for item in payload.fields], placeholders=[ExchangePlaceholder(**item.model_dump()) for item in payload.placeholders], title=payload.title, description=payload.description, sheet_name=payload.sheet_name, meta=payload.meta, source=source, ) content = source.load_template_file(plan) if source is not None else None if content is None: content = service.build_plan_template_file(plan) return StreamingResponse( iter([content]), media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": 'attachment; filename="template.xlsx"'}, ) @router.post("/templates/{template_id}/versions/upload") def upload_template_version( template_id: str, version: str, request: Request, file: UploadFile = File(...), db: Session = Depends(get_db), ): service = ExchangeService(request.app, db) content = file.file.read() parsed = ExcelTemplateCodec().load(content) snapshot = service.publish_version( template_id=template_id, version=version, bindings=[_binding_from_payload(item) for item in parsed.get("bindings", [])], fields=[_field_from_payload(item) for item in parsed.get("fields", [])], placeholders=[ _placeholder_from_payload(item) for item in parsed.get("placeholders", []) ], meta={ "title": parsed.get("title"), "description": parsed.get("description"), "sheet_name": parsed.get("sheet_name"), "source_file": file.filename, }, file_content=content, file_name=file.filename, ) return ok(ExchangeTemplateVersionResponse.model_validate(_version_payload(snapshot)).model_dump(mode="json")) @router.post("/tasks") def create_task( payload: ExchangeTaskCreateRequest, request: Request, db: Session = Depends(get_db), ): service = ExchangeService(request.app, db) source = _resolve_source(payload, request, db) plan = service.resolve_plan( template_kind=payload.task_kind, template_id=payload.template_id, version_id=payload.version_id, version=payload.version, bindings=[ExchangeTemplateBinding(**item.model_dump()) for item in payload.bindings], fields=[ExchangeField(**item.model_dump()) for item in payload.fields], placeholders=[ExchangePlaceholder(**item.model_dump()) for item in payload.placeholders], title=payload.title, description=payload.description, sheet_name=payload.sheet_name, meta=payload.meta, source=source, ) task = service.create_task( template_id=plan.template_id, version_id=plan.version_id, version=plan.version, task_kind=payload.task_kind, storage_key=payload.storage_key, input_payload=payload.input_payload, meta=payload.meta, ) return ok(ExchangeTaskResponse.model_validate(_task_payload(task)).model_dump(mode="json")) @router.get("/tasks/{task_id}/rows") def list_task_rows(task_id: str, request: Request, db: Session = Depends(get_db)): service = ExchangeService(request.app, db) return ok( [ { "id": item.id, "task_id": item.task_id, "row_index": item.row_index, "status": item.status, "data": item.data, "message": item.message, "result": item.result, } for item in service.list_task_rows(task_id) ] )