You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-Flask/iti/exchange/schemas.py

153 lines
4.5 KiB
Python

from __future__ import annotations
from datetime import datetime
from typing import Any
from pydantic import AliasChoices, Field
from iti.schemas import ItiModel
from .base import ExchangeTemplateKind, ExchangeTemplateSourceKind, ExchangeTaskKind
class ExchangePlaceholderSchema(ItiModel):
key: str
label: str
description: str | None = None
required: bool = False
example: str | None = None
class ExchangeFieldSchema(ItiModel):
key: str
label: str
placeholder: str | None = None
required: bool = False
example: str | None = None
width: int | None = None
format: str | None = None
source: str | None = None
target: str | None = None
options: list[tuple[str, str]] = Field(default_factory=list)
meta: dict[str, Any] = Field(default_factory=dict)
class ExchangeTemplateBindingSchema(ItiModel):
entity: str
template_kind: ExchangeTemplateKind
handler: str | None = None
description: str | None = None
default_sheet_name: str | None = None
default_file_name: str | None = None
title: str | None = None
meta: dict[str, Any] = Field(default_factory=dict)
class ExchangeTemplateCreateRequest(ItiModel):
code: str
name: str
template_kind: ExchangeTemplateKind
entity: str
description: str | None = None
meta: dict[str, Any] = Field(default_factory=dict)
class ExchangeTemplateUpdateRequest(ItiModel):
name: str | None = None
description: str | None = None
status: str | None = None
current_version: str | None = None
meta: dict[str, Any] | None = None
class ExchangeTemplateVersionCreateRequest(ItiModel):
version: str
bindings: list[ExchangeTemplateBindingSchema] = Field(default_factory=list)
fields: list[ExchangeFieldSchema] = Field(default_factory=list)
placeholders: list[ExchangePlaceholderSchema] = Field(default_factory=list)
meta: dict[str, Any] = Field(default_factory=dict)
make_current: bool = True
class ExchangePlanRequest(ItiModel):
template_id: str | None = None
version_id: str | None = None
version: str | None = None
source_kind: ExchangeTemplateSourceKind | None = None
source_name: str | None = None
source_service: str | None = None
bindings: list[ExchangeTemplateBindingSchema] = Field(default_factory=list)
fields: list[ExchangeFieldSchema] = Field(default_factory=list)
placeholders: list[ExchangePlaceholderSchema] = Field(default_factory=list)
title: str | None = None
description: str | None = None
sheet_name: str | None = None
meta: dict[str, Any] = Field(default_factory=dict)
class ExchangePlanResolveRequest(ExchangePlanRequest):
template_kind: ExchangeTemplateKind = Field(
validation_alias=AliasChoices("templateKind", "taskKind")
)
class ExchangePlanTemplateFileRequest(ExchangePlanResolveRequest):
pass
class ExchangeTaskCreateRequest(ExchangePlanRequest):
task_kind: ExchangeTaskKind
storage_key: str | None = None
input_payload: dict[str, Any] = Field(default_factory=dict)
class ExchangeTemplateResponse(ItiModel):
id: str
code: str
name: str
template_kind: str
entity: str
status: str
description: str | None = None
current_version: str | None = None
meta: dict[str, Any] = Field(default_factory=dict)
created_at: datetime
updated_at: datetime
class ExchangeTemplateVersionResponse(ItiModel):
id: str
template_id: str
version: str
template_kind: str
published_at: datetime | None = None
file_key: str | None = None
checksum: str | None = None
bindings: list[dict[str, Any]] = Field(default_factory=list)
fields: list[dict[str, Any]] = Field(default_factory=list)
placeholders: list[dict[str, Any]] = Field(default_factory=list)
meta: dict[str, Any] = Field(default_factory=dict)
created_at: datetime
updated_at: datetime
class ExchangeTaskResponse(ItiModel):
id: str
template_id: str | None = None
template_version_id: str | None = None
task_kind: str
status: str
requested_by: str | None = None
storage_key: str | None = None
success_count: int
failed_count: int
error_count: int
message: str | None = None
input_payload: dict[str, Any] = Field(default_factory=dict)
result_payload: dict[str, Any] = Field(default_factory=dict)
started_at: datetime | None = None
finished_at: datetime | None = None
meta: dict[str, Any] = Field(default_factory=dict)
created_at: datetime
updated_at: datetime