from __future__ import annotations from dataclasses import dataclass from io import BytesIO from typing import Any import pandas as pd from openpyxl import Workbook, load_workbook from openpyxl.worksheet.worksheet import Worksheet from .base import ExchangeField, ExchangePlaceholder, ExchangePlan, ExchangeTemplateSnapshot @dataclass class ExcelTemplateCodec: """Render and parse template workbooks.""" def build_workbook(self, snapshot: ExchangeTemplateSnapshot | ExchangePlan) -> Workbook: workbook = Workbook() worksheet = workbook.active sheet_name = getattr(snapshot, "sheet_name", None) worksheet.title = ( sheet_name or snapshot.meta.get("sheet_name") or (snapshot.bindings[0].default_sheet_name if snapshot.bindings else None) or _safe_sheet_name("Template") ) row = self._write_header(worksheet, snapshot) row = self._write_bindings(worksheet, snapshot, row) row = self._write_placeholders(worksheet, snapshot, row) self._write_fields(worksheet, snapshot, row) return workbook def _write_header( self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan ) -> int: meta = snapshot.meta if hasattr(snapshot, "meta") else {} title = getattr(snapshot, "title", None) version = getattr(snapshot, "version", None) worksheet["A1"] = title or meta.get("title") or snapshot.template_id or "Template" if version: worksheet["A2"] = f"version: {version}" elif meta.get("version"): worksheet["A2"] = f"version: {meta['version']}" description = getattr(snapshot, "description", None) if meta.get("description") or description: worksheet["A3"] = meta.get("description") or description return 5 def _write_bindings( self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan, row: int ) -> int: if not snapshot.bindings: return row worksheet.cell(row=row, column=1, value="Bindings") row += 1 headers = [ "entity", "template_kind", "handler", "description", "default_sheet_name", "default_file_name", "title", ] for col, value in enumerate(headers, start=1): worksheet.cell(row=row, column=col, value=value) row += 1 for binding in snapshot.bindings: values = [ binding.entity, _enum_value(binding.template_kind), binding.handler, binding.description, binding.default_sheet_name, binding.default_file_name, binding.title, ] for col, value in enumerate(values, start=1): worksheet.cell(row=row, column=col, value=value) row += 1 return row def _write_placeholders( self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan, row: int ) -> int: if not snapshot.placeholders: return row worksheet.cell(row=row, column=1, value="Placeholders") row += 1 for placeholder in snapshot.placeholders: worksheet.cell(row=row, column=1, value=placeholder.key) worksheet.cell(row=row, column=2, value=placeholder.label) worksheet.cell(row=row, column=3, value=placeholder.description) worksheet.cell(row=row, column=4, value=placeholder.example) worksheet.cell(row=row, column=5, value=placeholder.required) row += 1 return row def _write_fields( self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan, row: int ) -> None: worksheet.cell(row=row, column=1, value="Fields") row += 1 headers = [ "key", "label", "placeholder", "required", "example", "format", "source", "target", ] for col, value in enumerate(headers, start=1): worksheet.cell(row=row, column=col, value=value) row += 1 for field in snapshot.fields: values = [ field.key, field.label, field.placeholder, field.required, field.example, field.format, field.source, field.target, ] for col, value in enumerate(values, start=1): worksheet.cell(row=row, column=col, value=value) row += 1 def dump(self, snapshot: ExchangeTemplateSnapshot | ExchangePlan) -> bytes: buffer = BytesIO() self.build_workbook(snapshot).save(buffer) return buffer.getvalue() def load(self, content: bytes) -> dict[str, Any]: workbook = load_workbook(BytesIO(content)) worksheet = workbook.active payload = { "title": worksheet["A1"].value, "version": worksheet["A2"].value, "description": worksheet["A3"].value, "sheet_name": worksheet.title, } payload["bindings"], payload["placeholders"], payload["fields"] = self._parse_sections( worksheet ) return payload def _parse_sections( self, worksheet: Worksheet ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: bindings: list[dict[str, Any]] = [] placeholders: list[dict[str, Any]] = [] fields: list[dict[str, Any]] = [] mode: str | None = None headers: list[str] = [] for row in worksheet.iter_rows(values_only=True): cells = [cell for cell in row] first = cells[0] if cells else None if first == "Bindings": mode = "bindings_headers" headers = [] continue if first == "Placeholders": mode = "placeholders" continue if first == "Fields": mode = "fields_headers" headers = [] continue if mode == "bindings_headers": headers = [str(cell) if cell is not None else "" for cell in cells] if not headers: continue mode = "bindings" continue if mode == "bindings": if not any(cell is not None for cell in cells): continue item = {headers[idx]: cells[idx] for idx in range(min(len(headers), len(cells)))} if not item.get("entity") and not item.get("template_kind"): continue bindings.append( { "entity": item.get("entity"), "template_kind": item.get("template_kind"), "handler": item.get("handler"), "description": item.get("description"), "default_sheet_name": item.get("default_sheet_name"), "default_file_name": item.get("default_file_name"), "title": item.get("title"), "meta": {}, } ) continue if mode == "placeholders": if not any(cell is not None for cell in cells): continue placeholders.append( { "key": cells[0], "label": cells[1], "description": cells[2], "example": cells[3], "required": bool(cells[4]) if len(cells) > 4 else False, } ) continue if mode == "fields_headers": headers = [str(cell) if cell is not None else "" for cell in cells] if not headers: continue mode = "fields" continue if mode == "fields": if not any(cell is not None for cell in cells): continue item = {headers[idx]: cells[idx] for idx in range(min(len(headers), len(cells)))} if item.get("key") is None and item.get("label") is None: continue fields.append( { "key": item.get("key"), "label": item.get("label"), "placeholder": item.get("placeholder"), "required": bool(item.get("required", False)), "example": item.get("example"), "format": item.get("format"), "source": item.get("source"), "target": item.get("target"), "options": [], "meta": {}, } ) return bindings, placeholders, fields @dataclass class ExcelWorkbookCodec: """Read and write exchange data workbooks.""" def export_rows( self, headers: list[str], rows: list[dict[str, Any]], *, sheet_name: str = "Export", ) -> bytes: buffer = BytesIO() dataframe = pd.DataFrame.from_records(rows, columns=headers) with pd.ExcelWriter(buffer, engine="openpyxl") as writer: dataframe.to_excel( writer, index=False, sheet_name=_safe_sheet_name(sheet_name), ) return buffer.getvalue() def import_rows(self, content: bytes) -> list[dict[str, Any]]: dataframe = self._read_sheet(content) if dataframe.empty and len(dataframe.columns) == 0: return [] headers = [self._header_name(value) for value in dataframe.iloc[0].tolist()] return self._frame_to_records(dataframe.iloc[1:], headers) def import_rows_with_fields( self, content: bytes, *, fields: list[ExchangeField], ) -> list[dict[str, Any]]: dataframe = self._read_sheet(content) if dataframe.empty and len(dataframe.columns) == 0: return [] header_map = {field.workbook_header(): field.import_target_key() for field in fields} headers = [self._header_name(value) for value in dataframe.iloc[0].tolist()] return self._frame_to_records(dataframe.iloc[1:], headers, header_map=header_map) def export_rows_with_template( self, *, fields: list[ExchangeField], rows: list[dict[str, Any]], sheet_name: str = "Export", ) -> bytes: headers = [field.workbook_header() for field in fields] normalized_rows: list[dict[str, Any]] = [] for row in rows: item: dict[str, Any] = {} for field in fields: item[field.workbook_header()] = row.get(field.export_source_key()) normalized_rows.append(item) return self.export_rows(headers, normalized_rows, sheet_name=sheet_name) def export_rows_with_plan( self, *, plan: ExchangePlan, rows: list[dict[str, Any]], sheet_name: str | None = None, ) -> bytes: return self.export_rows_with_template( fields=list(plan.fields), rows=rows, sheet_name=sheet_name or plan.sheet_name or "Export", ) def _read_sheet(self, content: bytes) -> pd.DataFrame: workbook = load_workbook(BytesIO(content), read_only=True, data_only=True) sheet_name = workbook.active.title workbook.close() return pd.read_excel( BytesIO(content), sheet_name=sheet_name, header=None, dtype=object, engine="openpyxl", ) def _frame_to_records( self, dataframe: pd.DataFrame, headers: list[str], *, header_map: dict[str, str] | None = None, ) -> list[dict[str, Any]]: result: list[dict[str, Any]] = [] for values in dataframe.itertuples(index=False, name=None): item: dict[str, Any] = {} for index, header in enumerate(headers): if not header: continue key = header_map.get(header, header) if header_map is not None else header value = values[index] if index < len(values) else None item[key] = self._normalize_value(value) result.append(item) return result @staticmethod def _normalize_value(value: Any) -> Any: return None if pd.isna(value) else value @staticmethod def _header_name(value: Any) -> str: normalized = ExcelWorkbookCodec._normalize_value(value) return "" if normalized is None else str(normalized) def _safe_sheet_name(value: str) -> str: cleaned = "".join(ch for ch in value if ch not in "[]:*?/\\") cleaned = cleaned.strip() if not cleaned: cleaned = "Sheet" return cleaned[:31] def _enum_value(value: Any) -> Any: return value.value if hasattr(value, "value") else value