You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-Flask/iti/applications/routes/sys/auth.py

334 lines
9.6 KiB
Python

from apiflask import APIBlueprint
from flask import current_app
from flask_jwt_extended import (
create_access_token,
create_refresh_token,
jwt_required,
)
from sqlalchemy import select
from sqlalchemy.orm import noload
from iti.applications.common.exceptions.biz_exp import BizException
from iti.applications.common.utils import success
from iti.applications.common.events import UserEvents
from iti.applications.extensions.limit import limiter
from iti.applications.models import User, UserSchema, Role, SysDept
from datetime import timedelta
from flask_jwt_extended import current_user
from iti.applications.extensions import db, eventbus, sys_log
from iti.applications.common.enums import GenderEnum, StatusEnum, LoginType
from iti.applications.service.sys_config import (
get_default_user_password,
get_default_user_roles,
get_default_user_depts,
)
from .schemas.auth import (
PasswordLoginRequest,
CodeLoginRequest,
RegisterRequest,
SendVerificationCodeRequest,
)
from iti.applications.service.verification_code import (
check_verification_code,
VerificationCodeUsage,
set_verification_code,
)
from iti.applications.common.enums import LogType
bp = APIBlueprint("sys_index", __name__, url_prefix="/auth", tag="系统.Auth")
@bp.post("/loginByPassword")
@bp.input(PasswordLoginRequest.Schema, location="json")
@sys_log(
name="密码登录",
desc="使用密码作为登录认证方式",
type=LogType.AUTH,
save_db=True,
execute_time=True,
)
def loginByPassword(json_data: PasswordLoginRequest):
"""
登录(仅限密码登录)
"""
if json_data.username:
user = db.session.scalar(select(User).filter_by(username=json_data.username))
elif json_data.phone:
user = db.session.scalar(select(User).filter_by(phone=json_data.phone))
elif json_data.email:
user = db.session.scalar(select(User).filter_by(email=json_data.email))
if user is None:
if json_data.autoRegister:
user = registerByPassword(
RegisterRequest(
username=json_data.username,
phone=json_data.phone,
email=json_data.email,
password=json_data.password,
gender=GenderEnum.SECURE,
status=StatusEnum.ENABLED,
)
)
else:
raise BizException("用户不存在")
if not user.check_password(json_data.password):
raise BizException("密码错误")
eventbus.emit(UserEvents.USER_LOGGED_IN.value, user, login_type=LoginType.PASSWORD)
return success(
login(user, refresh_token=True),
message="登录成功",
)
@bp.post("/loginByCode")
@bp.input(CodeLoginRequest.Schema, location="json")
@sys_log(
name="验证码登录",
desc="使用验证码作为登录认证方式,支持短信验证码和邮箱验证码",
type=LogType.AUTH,
save_db=True,
execute_time=True,
)
def loginByCode(json_data: CodeLoginRequest):
"""
登录(仅限验证码登录)
"""
if json_data.phone:
user = db.session.scalar(select(User).filter_by(phone=json_data.phone))
elif json_data.email:
user = db.session.scalar(select(User).filter_by(email=json_data.email))
if user is None:
if json_data.autoRegister:
user = registerByCode(
RegisterRequest(
phone=json_data.phone,
email=json_data.email,
code=json_data.code,
gender=GenderEnum.SECURE,
status=StatusEnum.ENABLED,
),
skip_check=True,
)
else:
raise BizException("用户不存在")
if not check_verification_code(
json_data.phone if json_data.phone else json_data.email,
json_data.code,
VerificationCodeUsage.LOGIN,
):
raise BizException("验证码错误")
return success(login(user, refresh_token=True), message="登录成功")
@bp.post("/logout")
@jwt_required()
@sys_log(
name="退出登录",
desc="退出登录",
type=LogType.AUTH,
save_db=True,
execute_time=True,
)
def logout():
"""
退出登录
"""
# 触发用户注销事件
eventbus.emit(UserEvents.USER_LOGOUT.value, current_user)
return success(message="退出登录成功")
@bp.post("/refresh")
@jwt_required(refresh=True)
@sys_log(
name="刷新令牌",
desc="刷新令牌",
type=LogType.AUTH,
save_db=True,
execute_time=True,
)
def refresh():
"""
刷新令牌
"""
# 触发用户刷新令牌事件
eventbus.emit(UserEvents.USER_AUTH_REFRESHED.value, current_user)
return success(login(current_user, refresh_token=True), message="刷新令牌成功")
@bp.post("/register")
@bp.input(RegisterRequest.Schema, location="json")
@bp.output(UserSchema)
@sys_log(
name="注册",
desc="用户注册",
type=LogType.OPERATION,
save_db=True,
execute_time=True,
)
def register(json_data: RegisterRequest):
"""
注册
"""
if json_data.password:
user = registerByPassword(json_data)
elif json_data.code:
user = registerByCode(json_data)
else:
raise BizException("密码或验证码不能同时为空")
if user is None:
raise BizException("注册失败")
# 触发用户注册事件
eventbus.emit(UserEvents.USER_REGISTERED.value, user)
# 注册成功自动登录
if json_data.autoLogin:
return success(login(user, refresh_token=True), message="注册成功")
return success(user, message="注册成功")
@bp.post("/sendVerificationCode")
@bp.input(SendVerificationCodeRequest.Schema, location="json")
@limiter.limit(limit_value="2 per minute")
@sys_log(
name="发送验证码",
desc="发送验证码",
type=LogType.OPERATION,
save_db=True,
execute_time=True,
)
def sendVerificationCode(json_data: SendVerificationCodeRequest):
"""
发送验证码
"""
# TODO 模拟发送验证码
return success(
set_verification_code(
subject=json_data.phone if json_data.phone else json_data.email,
usage=json_data.usage,
),
message="发送验证码成功",
)
@bp.get("/codes")
@jwt_required()
def get_user_permissions():
"""
获取用户权限编码
"""
return success(current_user.permissions)
def login(user, refresh_token: bool = False) -> dict:
"""
JWT登录
"""
access_token = create_access_token(
user,
# additional_claims={
# "roles": [role.code for role in user.roles],
# "permissions": user.permissions,
# },
)
if refresh_token:
refresh_token = create_refresh_token(user)
else:
refresh_token = None
return {
"access_token": access_token,
"token_type": "Bearer",
"expires_in": current_app.config.get(
"JWT_ACCESS_TOKEN_EXPIRES", timedelta(hours=1)
).total_seconds(),
"refresh_token": refresh_token,
"refresh_expires_in": current_app.config.get(
"JWT_REFRESH_TOKEN_EXPIRES", timedelta(days=30)
).total_seconds(),
}
def newUserByRegister(req: RegisterRequest) -> User:
"""
新建用户
"""
user = User(
username=req.username,
phone=req.phone,
email=req.email,
realname=req.realname,
avatar=req.avatar,
gender=req.gender,
status=req.status,
)
if req.password:
user.password = req.password
else:
user.password = get_default_user_password(default="TBemzAJ4jDvXkm.!")
# 默认绑定角色
user.roles = db.session.scalars(
select(Role)
.filter(Role.code.in_(get_default_user_roles()))
.options(noload(Role.menus), noload(Role.users))
).all()
# 默认部门
user.depts = db.session.scalars(
select(SysDept)
.filter(SysDept.id.in_(get_default_user_depts()))
.options(noload(SysDept.users))
).all()
db.session.add(user)
db.session.commit()
return user
def registerByCode(req: RegisterRequest, skip_check: bool = False) -> User:
"""
注册(仅限验证码注册)
使用系统配置的默认密码
使用随机用户名 (基于phone或者email)
"""
# 前置验证,用户是否存在
if req.phone:
user = db.session.scalar(select(User).filter_by(phone=req.phone))
elif req.email:
user = db.session.scalar(select(User).filter_by(email=req.email))
if user is not None:
raise BizException("用户已存在")
# 验证验证码是否正确
if not skip_check:
if not check_verification_code(
req.phone if req.phone else req.email,
req.code,
VerificationCodeUsage.REGISTER,
):
raise BizException("验证码错误")
current_app.logger.warning(f"验证码注册,跳过验证码验证: {req.code}")
if not req.username:
if req.phone:
req.username = f"用户_{req.phone}"
elif req.email:
req.username = f"用户_{req.email}"
return newUserByRegister(req)
def registerByPassword(req: RegisterRequest):
"""
注册(仅限密码注册)
"""
# 前置验证,用户是否存在
if req.username:
user = db.session.scalar(select(User).filter_by(username=req.username))
elif req.phone:
user = db.session.scalar(select(User).filter_by(phone=req.phone))
elif req.email:
user = db.session.scalar(select(User).filter_by(email=req.email))
if user is not None:
raise BizException("用户已存在")
return newUserByRegister(req)