You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-Flask/iti/exchange/excel.py

363 lines
13 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from io import BytesIO
from typing import Any
import pandas as pd
from openpyxl import Workbook, load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from .base import ExchangeField, ExchangePlaceholder, ExchangePlan, ExchangeTemplateSnapshot
@dataclass
class ExcelTemplateCodec:
"""Render and parse template workbooks."""
def build_workbook(self, snapshot: ExchangeTemplateSnapshot | ExchangePlan) -> Workbook:
workbook = Workbook()
worksheet = workbook.active
sheet_name = getattr(snapshot, "sheet_name", None)
worksheet.title = (
sheet_name
or snapshot.meta.get("sheet_name")
or (snapshot.bindings[0].default_sheet_name if snapshot.bindings else None)
or _safe_sheet_name("Template")
)
row = self._write_header(worksheet, snapshot)
row = self._write_bindings(worksheet, snapshot, row)
row = self._write_placeholders(worksheet, snapshot, row)
self._write_fields(worksheet, snapshot, row)
return workbook
def _write_header(
self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan
) -> int:
meta = snapshot.meta if hasattr(snapshot, "meta") else {}
title = getattr(snapshot, "title", None)
version = getattr(snapshot, "version", None)
worksheet["A1"] = title or meta.get("title") or snapshot.template_id or "Template"
if version:
worksheet["A2"] = f"version: {version}"
elif meta.get("version"):
worksheet["A2"] = f"version: {meta['version']}"
description = getattr(snapshot, "description", None)
if meta.get("description") or description:
worksheet["A3"] = meta.get("description") or description
return 5
def _write_bindings(
self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan, row: int
) -> int:
if not snapshot.bindings:
return row
worksheet.cell(row=row, column=1, value="Bindings")
row += 1
headers = [
"entity",
"template_kind",
"handler",
"description",
"default_sheet_name",
"default_file_name",
"title",
]
for col, value in enumerate(headers, start=1):
worksheet.cell(row=row, column=col, value=value)
row += 1
for binding in snapshot.bindings:
values = [
binding.entity,
_enum_value(binding.template_kind),
binding.handler,
binding.description,
binding.default_sheet_name,
binding.default_file_name,
binding.title,
]
for col, value in enumerate(values, start=1):
worksheet.cell(row=row, column=col, value=value)
row += 1
return row
def _write_placeholders(
self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan, row: int
) -> int:
if not snapshot.placeholders:
return row
worksheet.cell(row=row, column=1, value="Placeholders")
row += 1
for placeholder in snapshot.placeholders:
worksheet.cell(row=row, column=1, value=placeholder.key)
worksheet.cell(row=row, column=2, value=placeholder.label)
worksheet.cell(row=row, column=3, value=placeholder.description)
worksheet.cell(row=row, column=4, value=placeholder.example)
worksheet.cell(row=row, column=5, value=placeholder.required)
row += 1
return row
def _write_fields(
self, worksheet: Worksheet, snapshot: ExchangeTemplateSnapshot | ExchangePlan, row: int
) -> None:
worksheet.cell(row=row, column=1, value="Fields")
row += 1
headers = [
"key",
"label",
"placeholder",
"required",
"example",
"format",
"source",
"target",
]
for col, value in enumerate(headers, start=1):
worksheet.cell(row=row, column=col, value=value)
row += 1
for field in snapshot.fields:
values = [
field.key,
field.label,
field.placeholder,
field.required,
field.example,
field.format,
field.source,
field.target,
]
for col, value in enumerate(values, start=1):
worksheet.cell(row=row, column=col, value=value)
row += 1
def dump(self, snapshot: ExchangeTemplateSnapshot | ExchangePlan) -> bytes:
buffer = BytesIO()
self.build_workbook(snapshot).save(buffer)
return buffer.getvalue()
def load(self, content: bytes) -> dict[str, Any]:
workbook = load_workbook(BytesIO(content))
worksheet = workbook.active
payload = {
"title": worksheet["A1"].value,
"version": worksheet["A2"].value,
"description": worksheet["A3"].value,
"sheet_name": worksheet.title,
}
payload["bindings"], payload["placeholders"], payload["fields"] = self._parse_sections(
worksheet
)
return payload
def _parse_sections(
self, worksheet: Worksheet
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
bindings: list[dict[str, Any]] = []
placeholders: list[dict[str, Any]] = []
fields: list[dict[str, Any]] = []
mode: str | None = None
headers: list[str] = []
for row in worksheet.iter_rows(values_only=True):
cells = [cell for cell in row]
first = cells[0] if cells else None
if first == "Bindings":
mode = "bindings_headers"
headers = []
continue
if first == "Placeholders":
mode = "placeholders"
continue
if first == "Fields":
mode = "fields_headers"
headers = []
continue
if mode == "bindings_headers":
headers = [str(cell) if cell is not None else "" for cell in cells]
if not headers:
continue
mode = "bindings"
continue
if mode == "bindings":
if not any(cell is not None for cell in cells):
continue
item = {headers[idx]: cells[idx] for idx in range(min(len(headers), len(cells)))}
if not item.get("entity") and not item.get("template_kind"):
continue
bindings.append(
{
"entity": item.get("entity"),
"template_kind": item.get("template_kind"),
"handler": item.get("handler"),
"description": item.get("description"),
"default_sheet_name": item.get("default_sheet_name"),
"default_file_name": item.get("default_file_name"),
"title": item.get("title"),
"meta": {},
}
)
continue
if mode == "placeholders":
if not any(cell is not None for cell in cells):
continue
placeholders.append(
{
"key": cells[0],
"label": cells[1],
"description": cells[2],
"example": cells[3],
"required": bool(cells[4]) if len(cells) > 4 else False,
}
)
continue
if mode == "fields_headers":
headers = [str(cell) if cell is not None else "" for cell in cells]
if not headers:
continue
mode = "fields"
continue
if mode == "fields":
if not any(cell is not None for cell in cells):
continue
item = {headers[idx]: cells[idx] for idx in range(min(len(headers), len(cells)))}
if item.get("key") is None and item.get("label") is None:
continue
fields.append(
{
"key": item.get("key"),
"label": item.get("label"),
"placeholder": item.get("placeholder"),
"required": bool(item.get("required", False)),
"example": item.get("example"),
"format": item.get("format"),
"source": item.get("source"),
"target": item.get("target"),
"options": [],
"meta": {},
}
)
return bindings, placeholders, fields
@dataclass
class ExcelWorkbookCodec:
"""Read and write exchange data workbooks."""
def export_rows(
self,
headers: list[str],
rows: list[dict[str, Any]],
*,
sheet_name: str = "Export",
) -> bytes:
buffer = BytesIO()
dataframe = pd.DataFrame.from_records(rows, columns=headers)
with pd.ExcelWriter(buffer, engine="openpyxl") as writer:
dataframe.to_excel(
writer,
index=False,
sheet_name=_safe_sheet_name(sheet_name),
)
return buffer.getvalue()
def import_rows(self, content: bytes) -> list[dict[str, Any]]:
dataframe = self._read_sheet(content)
if dataframe.empty and len(dataframe.columns) == 0:
return []
headers = [self._header_name(value) for value in dataframe.iloc[0].tolist()]
return self._frame_to_records(dataframe.iloc[1:], headers)
def import_rows_with_fields(
self,
content: bytes,
*,
fields: list[ExchangeField],
) -> list[dict[str, Any]]:
dataframe = self._read_sheet(content)
if dataframe.empty and len(dataframe.columns) == 0:
return []
header_map = {field.workbook_header(): field.import_target_key() for field in fields}
headers = [self._header_name(value) for value in dataframe.iloc[0].tolist()]
return self._frame_to_records(dataframe.iloc[1:], headers, header_map=header_map)
def export_rows_with_template(
self,
*,
fields: list[ExchangeField],
rows: list[dict[str, Any]],
sheet_name: str = "Export",
) -> bytes:
headers = [field.workbook_header() for field in fields]
normalized_rows: list[dict[str, Any]] = []
for row in rows:
item: dict[str, Any] = {}
for field in fields:
item[field.workbook_header()] = row.get(field.export_source_key())
normalized_rows.append(item)
return self.export_rows(headers, normalized_rows, sheet_name=sheet_name)
def export_rows_with_plan(
self,
*,
plan: ExchangePlan,
rows: list[dict[str, Any]],
sheet_name: str | None = None,
) -> bytes:
return self.export_rows_with_template(
fields=list(plan.fields),
rows=rows,
sheet_name=sheet_name or plan.sheet_name or "Export",
)
def _read_sheet(self, content: bytes) -> pd.DataFrame:
workbook = load_workbook(BytesIO(content), read_only=True, data_only=True)
sheet_name = workbook.active.title
workbook.close()
return pd.read_excel(
BytesIO(content),
sheet_name=sheet_name,
header=None,
dtype=object,
engine="openpyxl",
)
def _frame_to_records(
self,
dataframe: pd.DataFrame,
headers: list[str],
*,
header_map: dict[str, str] | None = None,
) -> list[dict[str, Any]]:
result: list[dict[str, Any]] = []
for values in dataframe.itertuples(index=False, name=None):
item: dict[str, Any] = {}
for index, header in enumerate(headers):
if not header:
continue
key = header_map.get(header, header) if header_map is not None else header
value = values[index] if index < len(values) else None
item[key] = self._normalize_value(value)
result.append(item)
return result
@staticmethod
def _normalize_value(value: Any) -> Any:
return None if pd.isna(value) else value
@staticmethod
def _header_name(value: Any) -> str:
normalized = ExcelWorkbookCodec._normalize_value(value)
return "" if normalized is None else str(normalized)
def _safe_sheet_name(value: str) -> str:
cleaned = "".join(ch for ch in value if ch not in "[]:*?/\\")
cleaned = cleaned.strip()
if not cleaned:
cleaned = "Sheet"
return cleaned[:31]
def _enum_value(value: Any) -> Any:
return value.value if hasattr(value, "value") else value