You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
238 lines
7.8 KiB
Python
238 lines
7.8 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from iti.modules import ModuleMenuSeed
|
|
from iti.modules.registry import ModuleRegistry
|
|
|
|
from iti_system.enums import GenderEnum, StatusEnum
|
|
from iti_system.models import Role, SysConfig, SysDept, SysDictData, SysDictType, SysMenu, User
|
|
|
|
|
|
@dataclass
|
|
class SeedCounter:
|
|
created: int = 0
|
|
updated: int = 0
|
|
skipped: int = 0
|
|
|
|
def as_dict(self) -> dict[str, int]:
|
|
return {"created": self.created, "updated": self.updated, "skipped": self.skipped}
|
|
|
|
|
|
DEFAULT_ROLES = [
|
|
{"name": "管理员", "code": "ADMIN", "desc": "系统默认管理员", "sort": 0},
|
|
{"name": "普通角色", "code": "COMMON", "desc": "一般角色", "sort": 100},
|
|
]
|
|
|
|
DEFAULT_USERS = [
|
|
{
|
|
"username": "admin",
|
|
"password": "123456",
|
|
"realname": "管理员",
|
|
"phone": "18888888888",
|
|
"email": "a@a.com",
|
|
"avatar": "",
|
|
"gender": GenderEnum.SECURE.value,
|
|
"desc": "系统默认管理员",
|
|
"role_codes": ["ADMIN"],
|
|
},
|
|
]
|
|
|
|
DEFAULT_DICT_TYPES = [
|
|
{
|
|
"type_name": "状态",
|
|
"type_code": "status",
|
|
"desc": "通用启停状态",
|
|
"sort": 1,
|
|
"data": [
|
|
{"label": "启用", "code": "enabled", "value": "enabled", "sort": 1},
|
|
{"label": "停用", "code": "disabled", "value": "disabled", "sort": 2},
|
|
],
|
|
},
|
|
{
|
|
"type_name": "性别",
|
|
"type_code": "gender",
|
|
"desc": "用户性别",
|
|
"sort": 2,
|
|
"data": [
|
|
{"label": "男", "code": "male", "value": "male", "sort": 1},
|
|
{"label": "女", "code": "female", "value": "female", "sort": 2},
|
|
{"label": "保密", "code": "secure", "value": "secure", "sort": 3},
|
|
],
|
|
},
|
|
]
|
|
|
|
DEFAULT_CONFIGS = [
|
|
{"type": "USER", "name": "默认用户密码", "code": "DEFAULT_USER_PASSWORD", "value": "123456", "sort": 1},
|
|
{"type": "USER", "name": "默认用户角色", "code": "DEFAULT_USER_ROLES", "value": "COMMON", "sort": 2},
|
|
{"type": "SYSTEM", "name": "系统名称", "code": "system.name", "value": "iTi-Flask", "sort": 10},
|
|
{"type": "SYSTEM", "name": "后端访问地址", "code": "BACKEND_URL", "value": "http://localhost:8000", "sort": 90},
|
|
]
|
|
|
|
|
|
def seed_system_data(
|
|
db: Session,
|
|
module_registry: ModuleRegistry | None = None,
|
|
) -> dict[str, dict[str, int]]:
|
|
counters = {
|
|
"roles": _seed_roles(db),
|
|
"menus": _seed_module_menus(db, module_registry),
|
|
"dicts": _seed_dicts(db),
|
|
"configs": _seed_configs(db),
|
|
"users": _seed_users(db),
|
|
}
|
|
db.flush()
|
|
counters["user_roles"] = _seed_user_roles(db)
|
|
counters["role_menus"] = _seed_role_menus(db, module_registry)
|
|
db.commit()
|
|
return {name: counter.as_dict() for name, counter in counters.items()}
|
|
|
|
|
|
def _seed_roles(db: Session) -> SeedCounter:
|
|
counter = SeedCounter()
|
|
for item in DEFAULT_ROLES:
|
|
role = db.scalar(select(Role).where(Role.code == item["code"]))
|
|
payload = {**item, "status": StatusEnum.ENABLED.value}
|
|
if role is None:
|
|
db.add(Role(**payload))
|
|
counter.created += 1
|
|
elif _assign(role, payload):
|
|
counter.updated += 1
|
|
else:
|
|
counter.skipped += 1
|
|
return counter
|
|
|
|
|
|
def _seed_module_menus(db: Session, module_registry: ModuleRegistry | None) -> SeedCounter:
|
|
counter = SeedCounter()
|
|
for menu_seed in _module_menu_seeds(module_registry):
|
|
payload = menu_seed.as_menu_payload()
|
|
menu = db.get(SysMenu, payload["id"])
|
|
if menu is None:
|
|
db.add(SysMenu(**payload))
|
|
counter.created += 1
|
|
elif _assign(menu, payload):
|
|
counter.updated += 1
|
|
else:
|
|
counter.skipped += 1
|
|
return counter
|
|
|
|
|
|
def _seed_dicts(db: Session) -> SeedCounter:
|
|
counter = SeedCounter()
|
|
for item in DEFAULT_DICT_TYPES:
|
|
dict_type = db.scalar(select(SysDictType).where(SysDictType.type_code == item["type_code"]))
|
|
payload = {key: value for key, value in item.items() if key != "data"}
|
|
payload.setdefault("status", StatusEnum.ENABLED.value)
|
|
if dict_type is None:
|
|
db.add(SysDictType(**payload))
|
|
counter.created += 1
|
|
elif _assign(dict_type, payload):
|
|
counter.updated += 1
|
|
else:
|
|
counter.skipped += 1
|
|
|
|
for data in item["data"]:
|
|
dict_data = db.scalar(
|
|
select(SysDictData).where(
|
|
SysDictData.type_code == item["type_code"],
|
|
SysDictData.code == data["code"],
|
|
)
|
|
)
|
|
data_payload = {**data, "type_code": item["type_code"], "status": StatusEnum.ENABLED.value}
|
|
if dict_data is None:
|
|
db.add(SysDictData(**data_payload))
|
|
counter.created += 1
|
|
elif _assign(dict_data, data_payload):
|
|
counter.updated += 1
|
|
else:
|
|
counter.skipped += 1
|
|
return counter
|
|
|
|
|
|
def _seed_configs(db: Session) -> SeedCounter:
|
|
counter = SeedCounter()
|
|
for item in DEFAULT_CONFIGS:
|
|
config = db.scalar(select(SysConfig).where(SysConfig.type == item["type"], SysConfig.code == item["code"]))
|
|
payload = {**item, "status": StatusEnum.ENABLED.value}
|
|
if config is None:
|
|
db.add(SysConfig(**payload))
|
|
counter.created += 1
|
|
elif _assign(config, payload):
|
|
counter.updated += 1
|
|
else:
|
|
counter.skipped += 1
|
|
return counter
|
|
|
|
|
|
def _seed_users(db: Session) -> SeedCounter:
|
|
counter = SeedCounter()
|
|
for item in DEFAULT_USERS:
|
|
user = db.scalar(select(User).where(User.username == item["username"]))
|
|
if user is not None:
|
|
counter.skipped += 1
|
|
continue
|
|
payload = {key: value for key, value in item.items() if key not in {"password", "role_codes"}}
|
|
user = User(status=StatusEnum.ENABLED.value, **payload)
|
|
user.set_password(item["password"])
|
|
db.add(user)
|
|
counter.created += 1
|
|
return counter
|
|
|
|
|
|
def _seed_user_roles(db: Session) -> SeedCounter:
|
|
counter = SeedCounter()
|
|
for item in DEFAULT_USERS:
|
|
user = db.scalar(select(User).where(User.username == item["username"]))
|
|
if user is None:
|
|
counter.skipped += 1
|
|
continue
|
|
existing = {role.code for role in user.roles}
|
|
for role_code in item["role_codes"]:
|
|
if role_code in existing:
|
|
counter.skipped += 1
|
|
continue
|
|
role = db.scalar(select(Role).where(Role.code == role_code))
|
|
if role is None:
|
|
counter.skipped += 1
|
|
continue
|
|
user.roles.append(role)
|
|
counter.created += 1
|
|
return counter
|
|
|
|
|
|
def _seed_role_menus(db: Session, module_registry: ModuleRegistry | None) -> SeedCounter:
|
|
counter = SeedCounter()
|
|
admin = db.scalar(select(Role).where(Role.code == "ADMIN"))
|
|
if admin is None:
|
|
return counter
|
|
existing = {menu.id for menu in admin.menus}
|
|
for menu_seed in _module_menu_seeds(module_registry):
|
|
if menu_seed.id in existing:
|
|
counter.skipped += 1
|
|
continue
|
|
menu = db.get(SysMenu, menu_seed.id)
|
|
if menu is None:
|
|
counter.skipped += 1
|
|
continue
|
|
admin.menus.append(menu)
|
|
counter.created += 1
|
|
return counter
|
|
|
|
|
|
def _module_menu_seeds(module_registry: ModuleRegistry | None) -> list[ModuleMenuSeed]:
|
|
return module_registry.list_menu_seeds() if module_registry else []
|
|
|
|
|
|
def _assign(target: Any, payload: dict[str, Any]) -> bool:
|
|
changed = False
|
|
for key, value in payload.items():
|
|
if getattr(target, key, None) != value:
|
|
setattr(target, key, value)
|
|
changed = True
|
|
return changed
|