from __future__ import annotations from dataclasses import dataclass from io import BytesIO from typing import Any import pandas as pd from openpyxl import Workbook, load_workbook from openpyxl.worksheet.worksheet import Worksheet from .base import ExchangeTemplatePlan, ExchangeTemplateSnapshot, ExchangeVariable @dataclass class ExcelTemplateCodec: """Render and parse lightweight template workbooks.""" def build_workbook(self, plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan) -> Workbook: workbook = Workbook() worksheet = workbook.active worksheet.title = _safe_sheet_name(plan.layout.sheet_name or "Template") self._write_header(worksheet, plan) self._write_variables(worksheet, plan) return workbook def _write_header( self, worksheet: Worksheet, plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan, ) -> None: worksheet["A1"] = plan.layout.title or plan.name or plan.generated_code() worksheet["A2"] = "biz_domain" worksheet["B2"] = plan.scope.biz_domain worksheet["A3"] = "biz_obj" worksheet["B3"] = plan.scope.biz_obj worksheet["A4"] = "operation" worksheet["B4"] = str(plan.scope.operation) if plan.version: worksheet["A5"] = "version" worksheet["B5"] = plan.version if plan.description: worksheet["A6"] = "description" worksheet["B6"] = plan.description def _write_variables( self, worksheet: Worksheet, plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan, ) -> None: start_row = 8 worksheet.cell(row=start_row, column=1, value="Variables") headers = ["key", "label", "header", "required", "example", "description"] for col, value in enumerate(headers, start=1): worksheet.cell(row=start_row + 1, column=col, value=value) for row_index, variable in enumerate(plan.variables, start=start_row + 2): worksheet.cell(row=row_index, column=1, value=variable.key) worksheet.cell(row=row_index, column=2, value=variable.label) worksheet.cell(row=row_index, column=3, value=variable.header) worksheet.cell(row=row_index, column=4, value=variable.required) worksheet.cell(row=row_index, column=5, value=variable.example) worksheet.cell(row=row_index, column=6, value=variable.description) def dump(self, plan: ExchangeTemplateSnapshot | ExchangeTemplatePlan) -> bytes: buffer = BytesIO() self.build_workbook(plan).save(buffer) return buffer.getvalue() def load(self, content: bytes) -> dict[str, Any]: workbook = load_workbook(BytesIO(content), data_only=True) worksheet = workbook.active payload = { "title": worksheet["A1"].value, "sheet_name": worksheet.title, "variables": self._parse_variables(worksheet), } workbook.close() return payload def _parse_variables(self, worksheet: Worksheet) -> list[dict[str, Any]]: variables: list[dict[str, Any]] = [] mode: str | None = None headers: list[str] = [] for row in worksheet.iter_rows(values_only=True): cells = list(row) first = cells[0] if cells else None if first == "Variables": mode = "headers" continue if mode == "headers": headers = [str(cell) if cell is not None else "" for cell in cells] mode = "rows" continue if mode != "rows" or not any(cell is not None for cell in cells): continue item = {headers[idx]: cells[idx] for idx in range(min(len(headers), len(cells)))} if item.get("key") is None: continue variables.append( { "key": item.get("key"), "label": item.get("label") or item.get("key"), "header": item.get("header"), "required": bool(item.get("required", False)), "example": item.get("example"), "description": item.get("description"), } ) return variables @dataclass class ExcelWorkbookCodec: """Read and write exchange data workbooks.""" def export_rows( self, headers: list[str], rows: list[dict[str, Any]], *, sheet_name: str = "Export", ) -> bytes: buffer = BytesIO() dataframe = pd.DataFrame.from_records(rows, columns=headers) with pd.ExcelWriter(buffer, engine="openpyxl") as writer: dataframe.to_excel( writer, index=False, sheet_name=_safe_sheet_name(sheet_name), ) return buffer.getvalue() def import_rows( self, content: bytes, *, header_row: int = 1, data_start_row: int | None = None, ) -> list[dict[str, Any]]: dataframe = self._read_sheet(content) if dataframe.empty and len(dataframe.columns) == 0: return [] header_index = max(header_row - 1, 0) if header_index >= len(dataframe): return [] headers = [self._header_name(value) for value in dataframe.iloc[header_index].tolist()] start_index = max((data_start_row or header_row + 1) - 1, 0) return self._frame_to_records(dataframe.iloc[start_index:], headers) def import_rows_with_variables( self, content: bytes, *, variables: list[ExchangeVariable], header_row: int = 1, data_start_row: int | None = None, ) -> list[dict[str, Any]]: dataframe = self._read_sheet(content) if dataframe.empty and len(dataframe.columns) == 0: return [] header_index = max(header_row - 1, 0) if header_index >= len(dataframe): return [] header_map = {variable.workbook_header(): variable.key for variable in variables} headers = [self._header_name(value) for value in dataframe.iloc[header_index].tolist()] start_index = max((data_start_row or header_row + 1) - 1, 0) return self._frame_to_records(dataframe.iloc[start_index:], headers, header_map=header_map) def export_rows_with_variables( self, *, variables: list[ExchangeVariable], rows: list[dict[str, Any]], sheet_name: str = "Export", ) -> bytes: headers = [variable.workbook_header() for variable in variables] normalized_rows: list[dict[str, Any]] = [] for row in rows: item: dict[str, Any] = {} for variable in variables: item[variable.workbook_header()] = row.get(variable.key) normalized_rows.append(item) return self.export_rows(headers, normalized_rows, sheet_name=sheet_name) def export_rows_with_plan( self, *, plan: ExchangeTemplatePlan, rows: list[dict[str, Any]], sheet_name: str | None = None, ) -> bytes: return self.export_rows_with_variables( variables=list(plan.variables), rows=rows, sheet_name=sheet_name or plan.layout.sheet_name or "Export", ) def _read_sheet(self, content: bytes) -> pd.DataFrame: workbook = load_workbook(BytesIO(content), read_only=True, data_only=True) sheet_name = workbook.active.title workbook.close() return pd.read_excel( BytesIO(content), sheet_name=sheet_name, header=None, dtype=object, engine="openpyxl", ) def _frame_to_records( self, dataframe: pd.DataFrame, headers: list[str], *, header_map: dict[str, str] | None = None, ) -> list[dict[str, Any]]: result: list[dict[str, Any]] = [] for values in dataframe.itertuples(index=False, name=None): item: dict[str, Any] = {} has_value = False for index, header in enumerate(headers): if not header: continue key = header_map.get(header, header) if header_map is not None else header value = values[index] if index < len(values) else None normalized = self._normalize_value(value) if normalized is not None: has_value = True item[key] = normalized if has_value: result.append(item) return result @staticmethod def _normalize_value(value: Any) -> Any: return None if pd.isna(value) else value @staticmethod def _header_name(value: Any) -> str: normalized = ExcelWorkbookCodec._normalize_value(value) return "" if normalized is None else str(normalized) def _safe_sheet_name(value: str) -> str: cleaned = "".join(ch for ch in value if ch not in "[]:*?/\\") cleaned = cleaned.strip() if not cleaned: cleaned = "Sheet" return cleaned[:31]