You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-Flask/iti/exchange/excel.py

249 lines
9.0 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from io import BytesIO
from typing import Any
import pandas as pd
from openpyxl import Workbook, load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from .base import ExchangeTemplatePlan, ExchangeTemplateSnapshot, ExchangeVariable
@dataclass
class ExcelTemplateCodec:
"""Render and parse lightweight template workbooks."""
def build_workbook(self, plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan) -> Workbook:
workbook = Workbook()
worksheet = workbook.active
worksheet.title = _safe_sheet_name(plan.layout.sheet_name or "Template")
self._write_header(worksheet, plan)
self._write_variables(worksheet, plan)
return workbook
def _write_header(
self,
worksheet: Worksheet,
plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan,
) -> None:
worksheet["A1"] = plan.layout.title or plan.name or plan.generated_code()
worksheet["A2"] = "biz_domain"
worksheet["B2"] = plan.scope.biz_domain
worksheet["A3"] = "biz_obj"
worksheet["B3"] = plan.scope.biz_obj
worksheet["A4"] = "operation"
worksheet["B4"] = str(plan.scope.operation)
if plan.version:
worksheet["A5"] = "version"
worksheet["B5"] = plan.version
if plan.description:
worksheet["A6"] = "description"
worksheet["B6"] = plan.description
def _write_variables(
self,
worksheet: Worksheet,
plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan,
) -> None:
start_row = 8
worksheet.cell(row=start_row, column=1, value="Variables")
headers = ["key", "label", "header", "required", "example", "description"]
for col, value in enumerate(headers, start=1):
worksheet.cell(row=start_row + 1, column=col, value=value)
for row_index, variable in enumerate(plan.variables, start=start_row + 2):
worksheet.cell(row=row_index, column=1, value=variable.key)
worksheet.cell(row=row_index, column=2, value=variable.label)
worksheet.cell(row=row_index, column=3, value=variable.header)
worksheet.cell(row=row_index, column=4, value=variable.required)
worksheet.cell(row=row_index, column=5, value=variable.example)
worksheet.cell(row=row_index, column=6, value=variable.description)
def dump(self, plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan) -> bytes:
buffer = BytesIO()
self.build_workbook(plan).save(buffer)
return buffer.getvalue()
def load(self, content: bytes) -> dict[str, Any]:
workbook = load_workbook(BytesIO(content), data_only=True)
worksheet = workbook.active
payload = {
"title": worksheet["A1"].value,
"sheet_name": worksheet.title,
"variables": self._parse_variables(worksheet),
}
workbook.close()
return payload
def _parse_variables(self, worksheet: Worksheet) -> list[dict[str, Any]]:
variables: list[dict[str, Any]] = []
mode: str | None = None
headers: list[str] = []
for row in worksheet.iter_rows(values_only=True):
cells = list(row)
first = cells[0] if cells else None
if first == "Variables":
mode = "headers"
continue
if mode == "headers":
headers = [str(cell) if cell is not None else "" for cell in cells]
mode = "rows"
continue
if mode != "rows" or not any(cell is not None for cell in cells):
continue
item = {headers[idx]: cells[idx] for idx in range(min(len(headers), len(cells)))}
if item.get("key") is None:
continue
variables.append(
{
"key": item.get("key"),
"label": item.get("label") or item.get("key"),
"header": item.get("header"),
"required": bool(item.get("required", False)),
"example": item.get("example"),
"description": item.get("description"),
}
)
return variables
@dataclass
class ExcelWorkbookCodec:
"""Read and write exchange data workbooks."""
def export_rows(
self,
headers: list[str],
rows: list[dict[str, Any]],
*,
sheet_name: str = "Export",
) -> bytes:
buffer = BytesIO()
dataframe = pd.DataFrame.from_records(rows, columns=headers)
with pd.ExcelWriter(buffer, engine="openpyxl") as writer:
dataframe.to_excel(
writer,
index=False,
sheet_name=_safe_sheet_name(sheet_name),
)
return buffer.getvalue()
def import_rows(
self,
content: bytes,
*,
header_row: int = 1,
data_start_row: int | None = None,
) -> list[dict[str, Any]]:
dataframe = self._read_sheet(content)
if dataframe.empty and len(dataframe.columns) == 0:
return []
header_index = max(header_row - 1, 0)
if header_index >= len(dataframe):
return []
headers = [self._header_name(value) for value in dataframe.iloc[header_index].tolist()]
start_index = max((data_start_row or header_row + 1) - 1, 0)
return self._frame_to_records(dataframe.iloc[start_index:], headers)
def import_rows_with_variables(
self,
content: bytes,
*,
variables: list[ExchangeVariable],
header_row: int = 1,
data_start_row: int | None = None,
) -> list[dict[str, Any]]:
dataframe = self._read_sheet(content)
if dataframe.empty and len(dataframe.columns) == 0:
return []
header_index = max(header_row - 1, 0)
if header_index >= len(dataframe):
return []
header_map = {variable.workbook_header(): variable.key for variable in variables}
headers = [self._header_name(value) for value in dataframe.iloc[header_index].tolist()]
start_index = max((data_start_row or header_row + 1) - 1, 0)
return self._frame_to_records(dataframe.iloc[start_index:], headers, header_map=header_map)
def export_rows_with_variables(
self,
*,
variables: list[ExchangeVariable],
rows: list[dict[str, Any]],
sheet_name: str = "Export",
) -> bytes:
headers = [variable.workbook_header() for variable in variables]
normalized_rows: list[dict[str, Any]] = []
for row in rows:
item: dict[str, Any] = {}
for variable in variables:
item[variable.workbook_header()] = row.get(variable.key)
normalized_rows.append(item)
return self.export_rows(headers, normalized_rows, sheet_name=sheet_name)
def export_rows_with_plan(
self,
*,
plan: ExchangeTemplatePlan,
rows: list[dict[str, Any]],
sheet_name: str | None = None,
) -> bytes:
return self.export_rows_with_variables(
variables=list(plan.variables),
rows=rows,
sheet_name=sheet_name or plan.layout.sheet_name or "Export",
)
def _read_sheet(self, content: bytes) -> pd.DataFrame:
workbook = load_workbook(BytesIO(content), read_only=True, data_only=True)
sheet_name = workbook.active.title
workbook.close()
return pd.read_excel(
BytesIO(content),
sheet_name=sheet_name,
header=None,
dtype=object,
engine="openpyxl",
)
def _frame_to_records(
self,
dataframe: pd.DataFrame,
headers: list[str],
*,
header_map: dict[str, str] | None = None,
) -> list[dict[str, Any]]:
result: list[dict[str, Any]] = []
for values in dataframe.itertuples(index=False, name=None):
item: dict[str, Any] = {}
has_value = False
for index, header in enumerate(headers):
if not header:
continue
key = header_map.get(header, header) if header_map is not None else header
value = values[index] if index < len(values) else None
normalized = self._normalize_value(value)
if normalized is not None:
has_value = True
item[key] = normalized
if has_value:
result.append(item)
return result
@staticmethod
def _normalize_value(value: Any) -> Any:
return None if pd.isna(value) else value
@staticmethod
def _header_name(value: Any) -> str:
normalized = ExcelWorkbookCodec._normalize_value(value)
return "" if normalized is None else str(normalized)
def _safe_sheet_name(value: str) -> str:
cleaned = "".join(ch for ch in value if ch not in "[]:*?/\\")
cleaned = cleaned.strip()
if not cleaned:
cleaned = "Sheet"
return cleaned[:31]