from __future__ import annotations from dataclasses import dataclass from typing import Any from sqlalchemy import select from sqlalchemy.orm import Session from iti.modules import ModuleMenuSeed from iti.modules.registry import ModuleRegistry from iti_system.enums import GenderEnum, StatusEnum from iti_system.models import Role, SysConfig, SysDept, SysDictData, SysDictType, SysMenu, User @dataclass class SeedCounter: created: int = 0 updated: int = 0 skipped: int = 0 def as_dict(self) -> dict[str, int]: return {"created": self.created, "updated": self.updated, "skipped": self.skipped} DEFAULT_ROLES = [ {"name": "管理员", "code": "ADMIN", "desc": "系统默认管理员", "sort": 0}, {"name": "普通角色", "code": "COMMON", "desc": "一般角色", "sort": 100}, ] DEFAULT_USERS = [ { "username": "admin", "password": "123456", "realname": "管理员", "phone": "18888888888", "email": "a@a.com", "avatar": "", "gender": GenderEnum.SECURE.value, "desc": "系统默认管理员", "role_codes": ["ADMIN"], }, ] DEFAULT_DICT_TYPES = [ { "type_name": "状态", "type_code": "status", "desc": "通用启停状态", "sort": 1, "data": [ {"label": "启用", "code": "enabled", "value": "enabled", "sort": 1}, {"label": "停用", "code": "disabled", "value": "disabled", "sort": 2}, ], }, { "type_name": "性别", "type_code": "gender", "desc": "用户性别", "sort": 2, "data": [ {"label": "男", "code": "male", "value": "male", "sort": 1}, {"label": "女", "code": "female", "value": "female", "sort": 2}, {"label": "保密", "code": "secure", "value": "secure", "sort": 3}, ], }, ] DEFAULT_CONFIGS = [ {"type": "USER", "name": "默认用户密码", "code": "DEFAULT_USER_PASSWORD", "value": "123456", "sort": 1}, {"type": "USER", "name": "默认用户角色", "code": "DEFAULT_USER_ROLES", "value": "COMMON", "sort": 2}, {"type": "SYSTEM", "name": "系统名称", "code": "system.name", "value": "iTi-Flask", "sort": 10}, {"type": "SYSTEM", "name": "后端访问地址", "code": "BACKEND_URL", "value": "http://localhost:8000", "sort": 90}, ] def seed_system_data( db: Session, module_registry: ModuleRegistry | None = None, ) -> dict[str, dict[str, int]]: counters = { "roles": _seed_roles(db), "menus": _seed_module_menus(db, module_registry), "dicts": _seed_dicts(db), "configs": _seed_configs(db), "users": _seed_users(db), } db.flush() counters["user_roles"] = _seed_user_roles(db) counters["role_menus"] = _seed_role_menus(db, module_registry) db.commit() return {name: counter.as_dict() for name, counter in counters.items()} def _seed_roles(db: Session) -> SeedCounter: counter = SeedCounter() for item in DEFAULT_ROLES: role = db.scalar(select(Role).where(Role.code == item["code"])) payload = {**item, "status": StatusEnum.ENABLED.value} if role is None: db.add(Role(**payload)) counter.created += 1 elif _assign(role, payload): counter.updated += 1 else: counter.skipped += 1 return counter def _seed_module_menus(db: Session, module_registry: ModuleRegistry | None) -> SeedCounter: counter = SeedCounter() for menu_seed in _module_menu_seeds(module_registry): payload = menu_seed.as_menu_payload() menu = db.get(SysMenu, payload["id"]) if menu is None: db.add(SysMenu(**payload)) counter.created += 1 elif _assign(menu, payload): counter.updated += 1 else: counter.skipped += 1 return counter def _seed_dicts(db: Session) -> SeedCounter: counter = SeedCounter() for item in DEFAULT_DICT_TYPES: dict_type = db.scalar(select(SysDictType).where(SysDictType.type_code == item["type_code"])) payload = {key: value for key, value in item.items() if key != "data"} payload.setdefault("status", StatusEnum.ENABLED.value) if dict_type is None: db.add(SysDictType(**payload)) counter.created += 1 elif _assign(dict_type, payload): counter.updated += 1 else: counter.skipped += 1 for data in item["data"]: dict_data = db.scalar( select(SysDictData).where( SysDictData.type_code == item["type_code"], SysDictData.code == data["code"], ) ) data_payload = {**data, "type_code": item["type_code"], "status": StatusEnum.ENABLED.value} if dict_data is None: db.add(SysDictData(**data_payload)) counter.created += 1 elif _assign(dict_data, data_payload): counter.updated += 1 else: counter.skipped += 1 return counter def _seed_configs(db: Session) -> SeedCounter: counter = SeedCounter() for item in DEFAULT_CONFIGS: config = db.scalar(select(SysConfig).where(SysConfig.type == item["type"], SysConfig.code == item["code"])) payload = {**item, "status": StatusEnum.ENABLED.value} if config is None: db.add(SysConfig(**payload)) counter.created += 1 elif _assign(config, payload): counter.updated += 1 else: counter.skipped += 1 return counter def _seed_users(db: Session) -> SeedCounter: counter = SeedCounter() for item in DEFAULT_USERS: user = db.scalar(select(User).where(User.username == item["username"])) if user is not None: counter.skipped += 1 continue payload = {key: value for key, value in item.items() if key not in {"password", "role_codes"}} user = User(status=StatusEnum.ENABLED.value, **payload) user.set_password(item["password"]) db.add(user) counter.created += 1 return counter def _seed_user_roles(db: Session) -> SeedCounter: counter = SeedCounter() for item in DEFAULT_USERS: user = db.scalar(select(User).where(User.username == item["username"])) if user is None: counter.skipped += 1 continue existing = {role.code for role in user.roles} for role_code in item["role_codes"]: if role_code in existing: counter.skipped += 1 continue role = db.scalar(select(Role).where(Role.code == role_code)) if role is None: counter.skipped += 1 continue user.roles.append(role) counter.created += 1 return counter def _seed_role_menus(db: Session, module_registry: ModuleRegistry | None) -> SeedCounter: counter = SeedCounter() admin = db.scalar(select(Role).where(Role.code == "ADMIN")) if admin is None: return counter existing = {menu.id for menu in admin.menus} for menu_seed in _module_menu_seeds(module_registry): if menu_seed.id in existing: counter.skipped += 1 continue menu = db.get(SysMenu, menu_seed.id) if menu is None: counter.skipped += 1 continue admin.menus.append(menu) counter.created += 1 return counter def _module_menu_seeds(module_registry: ModuleRegistry | None) -> list[ModuleMenuSeed]: return module_registry.list_menu_seeds() if module_registry else [] def _assign(target: Any, payload: dict[str, Any]) -> bool: changed = False for key, value in payload.items(): if getattr(target, key, None) != value: setattr(target, key, value) changed = True return changed