|
|
"""
|
|
|
用户扩展属性服务层
|
|
|
"""
|
|
|
from typing import Dict, List, Optional
|
|
|
from sqlalchemy import select, delete
|
|
|
from iti.applications.extensions import db
|
|
|
from iti.applications.models import User, SysUserAttribute
|
|
|
from iti.applications.common.exceptions.biz_exp import BizException
|
|
|
|
|
|
|
|
|
def get_user_attributes(user_id: str, group: Optional[str] = None) -> Dict:
|
|
|
"""
|
|
|
获取用户扩展属性
|
|
|
:param user_id: 用户ID
|
|
|
:param group: 属性分组(可选,不传则返回所有分组)
|
|
|
:return: 属性字典
|
|
|
"""
|
|
|
query = select(SysUserAttribute).filter_by(user_id=user_id)
|
|
|
if group:
|
|
|
query = query.filter_by(attr_group=group)
|
|
|
|
|
|
attributes = db.session.scalars(query.order_by(SysUserAttribute.sort)).all()
|
|
|
|
|
|
result = {}
|
|
|
for attr in attributes:
|
|
|
if attr.attr_group not in result:
|
|
|
result[attr.attr_group] = {}
|
|
|
result[attr.attr_group][attr.attr_key] = attr.get_typed_value()
|
|
|
|
|
|
# 如果指定了分组,只返回该分组的数据
|
|
|
if group:
|
|
|
return result.get(group, {})
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
def set_user_attributes(
|
|
|
user_id: str, attributes: Dict, group: Optional[str] = None
|
|
|
) -> None:
|
|
|
"""
|
|
|
设置用户扩展属性(批量更新或创建)
|
|
|
:param user_id: 用户ID
|
|
|
:param attributes: 属性字典
|
|
|
- 如果指定了 group,则 attributes 格式为 {"key1": "value1", "key2": "value2"}
|
|
|
- 如果未指定 group,则 attributes 格式为 {"group1": {"key1": "value1"}, "group2": {...}}
|
|
|
:param group: 属性分组(可选)
|
|
|
"""
|
|
|
# 验证用户是否存在
|
|
|
user = db.session.scalar(select(User).filter_by(id=user_id))
|
|
|
if not user:
|
|
|
raise BizException("用户不存在")
|
|
|
|
|
|
# 如果指定了分组,将属性包装成分组格式
|
|
|
if group:
|
|
|
attributes = {group: attributes}
|
|
|
|
|
|
for grp, attrs in attributes.items():
|
|
|
for key, value in attrs.items():
|
|
|
# 查找是否已存在
|
|
|
existing = db.session.scalar(
|
|
|
select(SysUserAttribute).filter_by(
|
|
|
user_id=user_id, attr_group=grp, attr_key=key
|
|
|
)
|
|
|
)
|
|
|
|
|
|
if existing:
|
|
|
# 更新现有属性
|
|
|
existing.set_typed_value(value)
|
|
|
else:
|
|
|
# 创建新属性
|
|
|
new_attr = SysUserAttribute(
|
|
|
user_id=user_id,
|
|
|
attr_group=grp,
|
|
|
attr_key=key,
|
|
|
attr_type="string", # 默认类型
|
|
|
)
|
|
|
new_attr.set_typed_value(value)
|
|
|
db.session.add(new_attr)
|
|
|
|
|
|
db.session.commit()
|
|
|
|
|
|
|
|
|
def delete_user_attribute(
|
|
|
user_id: str, group: str, key: Optional[str] = None
|
|
|
) -> None:
|
|
|
"""
|
|
|
删除用户扩展属性
|
|
|
:param user_id: 用户ID
|
|
|
:param group: 属性分组
|
|
|
:param key: 属性键(可选,不传则删除整个分组)
|
|
|
"""
|
|
|
query = delete(SysUserAttribute).filter_by(user_id=user_id, attr_group=group)
|
|
|
|
|
|
if key:
|
|
|
query = query.filter_by(attr_key=key)
|
|
|
|
|
|
db.session.execute(query)
|
|
|
db.session.commit()
|
|
|
|
|
|
|
|
|
def get_user_attribute_value(user_id: str, group: str, key: str):
|
|
|
"""
|
|
|
获取单个属性值
|
|
|
:param user_id: 用户ID
|
|
|
:param group: 属性分组
|
|
|
:param key: 属性键
|
|
|
:return: 属性值
|
|
|
"""
|
|
|
attr = db.session.scalar(
|
|
|
select(SysUserAttribute).filter_by(
|
|
|
user_id=user_id, attr_group=group, attr_key=key
|
|
|
)
|
|
|
)
|
|
|
|
|
|
if not attr:
|
|
|
return None
|
|
|
|
|
|
return attr.get_typed_value()
|
|
|
|
|
|
|
|
|
def set_user_attribute_value(
|
|
|
user_id: str, group: str, key: str, value, attr_type: str = "string"
|
|
|
) -> None:
|
|
|
"""
|
|
|
设置单个属性值
|
|
|
:param user_id: 用户ID
|
|
|
:param group: 属性分组
|
|
|
:param key: 属性键
|
|
|
:param value: 属性值
|
|
|
:param attr_type: 属性类型
|
|
|
"""
|
|
|
# 验证用户是否存在
|
|
|
user = db.session.scalar(select(User).filter_by(id=user_id))
|
|
|
if not user:
|
|
|
raise BizException("用户不存在")
|
|
|
|
|
|
# 查找是否已存在
|
|
|
attr = db.session.scalar(
|
|
|
select(SysUserAttribute).filter_by(
|
|
|
user_id=user_id, attr_group=group, attr_key=key
|
|
|
)
|
|
|
)
|
|
|
|
|
|
if attr:
|
|
|
# 更新现有属性
|
|
|
attr.attr_type = attr_type
|
|
|
attr.set_typed_value(value)
|
|
|
else:
|
|
|
# 创建新属性
|
|
|
new_attr = SysUserAttribute(
|
|
|
user_id=user_id,
|
|
|
attr_group=group,
|
|
|
attr_key=key,
|
|
|
attr_type=attr_type,
|
|
|
)
|
|
|
new_attr.set_typed_value(value)
|
|
|
db.session.add(new_attr)
|
|
|
|
|
|
db.session.commit()
|
|
|
|
|
|
|
|
|
def batch_set_user_attributes_with_type(
|
|
|
user_id: str, attributes: List[Dict]
|
|
|
) -> None:
|
|
|
"""
|
|
|
批量设置用户扩展属性(支持指定类型)
|
|
|
:param user_id: 用户ID
|
|
|
:param attributes: 属性列表,格式:
|
|
|
[
|
|
|
{
|
|
|
"attr_group": "erp",
|
|
|
"attr_key": "erp_username",
|
|
|
"attr_value": "ERP001",
|
|
|
"attr_type": "string",
|
|
|
"description": "ERP用户名",
|
|
|
"sort": 1
|
|
|
},
|
|
|
...
|
|
|
]
|
|
|
"""
|
|
|
# 验证用户是否存在
|
|
|
user = db.session.scalar(select(User).filter_by(id=user_id))
|
|
|
if not user:
|
|
|
raise BizException("用户不存在")
|
|
|
|
|
|
for attr_data in attributes:
|
|
|
group = attr_data.get("attr_group")
|
|
|
key = attr_data.get("attr_key")
|
|
|
value = attr_data.get("attr_value")
|
|
|
attr_type = attr_data.get("attr_type", "string")
|
|
|
description = attr_data.get("description")
|
|
|
sort = attr_data.get("sort", 0)
|
|
|
|
|
|
if not group or not key:
|
|
|
continue
|
|
|
|
|
|
# 查找是否已存在
|
|
|
existing = db.session.scalar(
|
|
|
select(SysUserAttribute).filter_by(
|
|
|
user_id=user_id, attr_group=group, attr_key=key
|
|
|
)
|
|
|
)
|
|
|
|
|
|
if existing:
|
|
|
# 更新现有属性
|
|
|
existing.attr_type = attr_type
|
|
|
existing.set_typed_value(value)
|
|
|
if description:
|
|
|
existing.description = description
|
|
|
existing.sort = sort
|
|
|
else:
|
|
|
# 创建新属性
|
|
|
new_attr = SysUserAttribute(
|
|
|
user_id=user_id,
|
|
|
attr_group=group,
|
|
|
attr_key=key,
|
|
|
attr_type=attr_type,
|
|
|
description=description,
|
|
|
sort=sort,
|
|
|
)
|
|
|
new_attr.set_typed_value(value)
|
|
|
db.session.add(new_attr)
|
|
|
|
|
|
db.session.commit()
|
|
|
|
|
|
|
|
|
def get_user_erp_credentials(user_id: str) -> Dict[str, Optional[str]]:
|
|
|
"""
|
|
|
获取用户的 ERP 凭证(便捷方法)
|
|
|
:param user_id: 用户ID
|
|
|
:return: {"erp_username": "xxx", "erp_password": "xxx"}
|
|
|
"""
|
|
|
erp_attrs = get_user_attributes(user_id, group='erp')
|
|
|
return {
|
|
|
"erp_username": erp_attrs.get('erp_username'),
|
|
|
"erp_password": erp_attrs.get('erp_password'),
|
|
|
}
|