You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
249 lines
9.0 KiB
Python
249 lines
9.0 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from io import BytesIO
|
|
from typing import Any
|
|
|
|
import pandas as pd
|
|
from openpyxl import Workbook, load_workbook
|
|
from openpyxl.worksheet.worksheet import Worksheet
|
|
|
|
from .base import ExchangeTemplatePlan, ExchangeTemplateSnapshot, ExchangeVariable
|
|
|
|
|
|
@dataclass
|
|
class ExcelTemplateCodec:
|
|
"""Render and parse lightweight template workbooks."""
|
|
|
|
def build_workbook(self, plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan) -> Workbook:
|
|
workbook = Workbook()
|
|
worksheet = workbook.active
|
|
worksheet.title = _safe_sheet_name(plan.layout.sheet_name or "Template")
|
|
self._write_header(worksheet, plan)
|
|
self._write_variables(worksheet, plan)
|
|
return workbook
|
|
|
|
def _write_header(
|
|
self,
|
|
worksheet: Worksheet,
|
|
plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan,
|
|
) -> None:
|
|
worksheet["A1"] = plan.layout.title or plan.name or plan.generated_code()
|
|
worksheet["A2"] = "biz_domain"
|
|
worksheet["B2"] = plan.scope.biz_domain
|
|
worksheet["A3"] = "biz_obj"
|
|
worksheet["B3"] = plan.scope.biz_obj
|
|
worksheet["A4"] = "operation"
|
|
worksheet["B4"] = str(plan.scope.operation)
|
|
if plan.version:
|
|
worksheet["A5"] = "version"
|
|
worksheet["B5"] = plan.version
|
|
if plan.description:
|
|
worksheet["A6"] = "description"
|
|
worksheet["B6"] = plan.description
|
|
|
|
def _write_variables(
|
|
self,
|
|
worksheet: Worksheet,
|
|
plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan,
|
|
) -> None:
|
|
start_row = 8
|
|
worksheet.cell(row=start_row, column=1, value="Variables")
|
|
headers = ["key", "label", "header", "required", "example", "description"]
|
|
for col, value in enumerate(headers, start=1):
|
|
worksheet.cell(row=start_row + 1, column=col, value=value)
|
|
for row_index, variable in enumerate(plan.variables, start=start_row + 2):
|
|
worksheet.cell(row=row_index, column=1, value=variable.key)
|
|
worksheet.cell(row=row_index, column=2, value=variable.label)
|
|
worksheet.cell(row=row_index, column=3, value=variable.header)
|
|
worksheet.cell(row=row_index, column=4, value=variable.required)
|
|
worksheet.cell(row=row_index, column=5, value=variable.example)
|
|
worksheet.cell(row=row_index, column=6, value=variable.description)
|
|
|
|
def dump(self, plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan) -> bytes:
|
|
buffer = BytesIO()
|
|
self.build_workbook(plan).save(buffer)
|
|
return buffer.getvalue()
|
|
|
|
def load(self, content: bytes) -> dict[str, Any]:
|
|
workbook = load_workbook(BytesIO(content), data_only=True)
|
|
worksheet = workbook.active
|
|
payload = {
|
|
"title": worksheet["A1"].value,
|
|
"sheet_name": worksheet.title,
|
|
"variables": self._parse_variables(worksheet),
|
|
}
|
|
workbook.close()
|
|
return payload
|
|
|
|
def _parse_variables(self, worksheet: Worksheet) -> list[dict[str, Any]]:
|
|
variables: list[dict[str, Any]] = []
|
|
mode: str | None = None
|
|
headers: list[str] = []
|
|
for row in worksheet.iter_rows(values_only=True):
|
|
cells = list(row)
|
|
first = cells[0] if cells else None
|
|
if first == "Variables":
|
|
mode = "headers"
|
|
continue
|
|
if mode == "headers":
|
|
headers = [str(cell) if cell is not None else "" for cell in cells]
|
|
mode = "rows"
|
|
continue
|
|
if mode != "rows" or not any(cell is not None for cell in cells):
|
|
continue
|
|
item = {headers[idx]: cells[idx] for idx in range(min(len(headers), len(cells)))}
|
|
if item.get("key") is None:
|
|
continue
|
|
variables.append(
|
|
{
|
|
"key": item.get("key"),
|
|
"label": item.get("label") or item.get("key"),
|
|
"header": item.get("header"),
|
|
"required": bool(item.get("required", False)),
|
|
"example": item.get("example"),
|
|
"description": item.get("description"),
|
|
}
|
|
)
|
|
return variables
|
|
|
|
|
|
@dataclass
|
|
class ExcelWorkbookCodec:
|
|
"""Read and write exchange data workbooks."""
|
|
|
|
def export_rows(
|
|
self,
|
|
headers: list[str],
|
|
rows: list[dict[str, Any]],
|
|
*,
|
|
sheet_name: str = "Export",
|
|
) -> bytes:
|
|
buffer = BytesIO()
|
|
dataframe = pd.DataFrame.from_records(rows, columns=headers)
|
|
with pd.ExcelWriter(buffer, engine="openpyxl") as writer:
|
|
dataframe.to_excel(
|
|
writer,
|
|
index=False,
|
|
sheet_name=_safe_sheet_name(sheet_name),
|
|
)
|
|
return buffer.getvalue()
|
|
|
|
def import_rows(
|
|
self,
|
|
content: bytes,
|
|
*,
|
|
header_row: int = 1,
|
|
data_start_row: int | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
dataframe = self._read_sheet(content)
|
|
if dataframe.empty and len(dataframe.columns) == 0:
|
|
return []
|
|
header_index = max(header_row - 1, 0)
|
|
if header_index >= len(dataframe):
|
|
return []
|
|
headers = [self._header_name(value) for value in dataframe.iloc[header_index].tolist()]
|
|
start_index = max((data_start_row or header_row + 1) - 1, 0)
|
|
return self._frame_to_records(dataframe.iloc[start_index:], headers)
|
|
|
|
def import_rows_with_variables(
|
|
self,
|
|
content: bytes,
|
|
*,
|
|
variables: list[ExchangeVariable],
|
|
header_row: int = 1,
|
|
data_start_row: int | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
dataframe = self._read_sheet(content)
|
|
if dataframe.empty and len(dataframe.columns) == 0:
|
|
return []
|
|
header_index = max(header_row - 1, 0)
|
|
if header_index >= len(dataframe):
|
|
return []
|
|
header_map = {variable.workbook_header(): variable.key for variable in variables}
|
|
headers = [self._header_name(value) for value in dataframe.iloc[header_index].tolist()]
|
|
start_index = max((data_start_row or header_row + 1) - 1, 0)
|
|
return self._frame_to_records(dataframe.iloc[start_index:], headers, header_map=header_map)
|
|
|
|
def export_rows_with_variables(
|
|
self,
|
|
*,
|
|
variables: list[ExchangeVariable],
|
|
rows: list[dict[str, Any]],
|
|
sheet_name: str = "Export",
|
|
) -> bytes:
|
|
headers = [variable.workbook_header() for variable in variables]
|
|
normalized_rows: list[dict[str, Any]] = []
|
|
for row in rows:
|
|
item: dict[str, Any] = {}
|
|
for variable in variables:
|
|
item[variable.workbook_header()] = row.get(variable.key)
|
|
normalized_rows.append(item)
|
|
return self.export_rows(headers, normalized_rows, sheet_name=sheet_name)
|
|
|
|
def export_rows_with_plan(
|
|
self,
|
|
*,
|
|
plan: ExchangeTemplatePlan,
|
|
rows: list[dict[str, Any]],
|
|
sheet_name: str | None = None,
|
|
) -> bytes:
|
|
return self.export_rows_with_variables(
|
|
variables=list(plan.variables),
|
|
rows=rows,
|
|
sheet_name=sheet_name or plan.layout.sheet_name or "Export",
|
|
)
|
|
|
|
def _read_sheet(self, content: bytes) -> pd.DataFrame:
|
|
workbook = load_workbook(BytesIO(content), read_only=True, data_only=True)
|
|
sheet_name = workbook.active.title
|
|
workbook.close()
|
|
return pd.read_excel(
|
|
BytesIO(content),
|
|
sheet_name=sheet_name,
|
|
header=None,
|
|
dtype=object,
|
|
engine="openpyxl",
|
|
)
|
|
|
|
def _frame_to_records(
|
|
self,
|
|
dataframe: pd.DataFrame,
|
|
headers: list[str],
|
|
*,
|
|
header_map: dict[str, str] | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
result: list[dict[str, Any]] = []
|
|
for values in dataframe.itertuples(index=False, name=None):
|
|
item: dict[str, Any] = {}
|
|
has_value = False
|
|
for index, header in enumerate(headers):
|
|
if not header:
|
|
continue
|
|
key = header_map.get(header, header) if header_map is not None else header
|
|
value = values[index] if index < len(values) else None
|
|
normalized = self._normalize_value(value)
|
|
if normalized is not None:
|
|
has_value = True
|
|
item[key] = normalized
|
|
if has_value:
|
|
result.append(item)
|
|
return result
|
|
|
|
@staticmethod
|
|
def _normalize_value(value: Any) -> Any:
|
|
return None if pd.isna(value) else value
|
|
|
|
@staticmethod
|
|
def _header_name(value: Any) -> str:
|
|
normalized = ExcelWorkbookCodec._normalize_value(value)
|
|
return "" if normalized is None else str(normalized)
|
|
|
|
|
|
def _safe_sheet_name(value: str) -> str:
|
|
cleaned = "".join(ch for ch in value if ch not in "[]:*?/\\")
|
|
cleaned = cleaned.strip()
|
|
if not cleaned:
|
|
cleaned = "Sheet"
|
|
return cleaned[:31]
|