You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-System/iti_system/routes/auth.py

187 lines
6.1 KiB
Python

from __future__ import annotations
from fastapi import APIRouter, Depends, Request
from sqlalchemy.orm import Session
from iti.audit import audit_login
from iti.auth import Actor
from iti.auth import create_access_token, decode_token, require_user
from iti.db import get_db
from iti.exceptions import BizError, Unauthorized
from iti.limiter import limit
from iti.responses import ok
from iti_system.enums import GenderEnum, StatusEnum
from iti_system.models import User
from iti_system.schemas import CodeLogin, PasswordLogin, RegisterIn, SendVerificationCodeIn
from iti_system.services import (
bind_depts,
bind_roles,
create_token_payload,
dump_user,
find_login_user,
new_id,
)
from iti_system.verification import VerificationCodeUsage, check_code, set_code
router = APIRouter(prefix="/auth", tags=["system.auth"])
@router.post("/loginByPassword")
def login_by_password(payload: PasswordLogin, request: Request, db: Session = Depends(get_db)):
user = find_login_user(db, username=payload.username, phone=payload.phone, email=payload.email)
if user is None:
if not payload.auto_register:
audit_login(
request,
success=False,
desc=payload.username or payload.phone or payload.email,
error="用户不存在",
)
raise BizError("用户不存在")
user = _create_user(
db,
username=payload.username or payload.phone or payload.email or f"user_{new_id()[:8]}",
password=payload.password,
phone=payload.phone,
email=payload.email,
)
if not user.check_password(payload.password):
audit_login(
request,
actor=Actor(id=user.id, type="user"),
success=False,
desc=user.username,
error="密码错误",
)
raise BizError("密码错误")
audit_login(
request,
actor=Actor(id=user.id, type="user"),
success=True,
desc=user.username,
)
return ok(create_token_payload(user, request.app.state.config), message="登录成功")
@router.post("/loginByCode")
def login_by_code(payload: CodeLogin, request: Request, db: Session = Depends(get_db)):
subject = payload.phone or payload.email
if not subject:
raise BizError("手机号或邮箱不能为空")
if not check_code(subject, payload.code, VerificationCodeUsage.LOGIN):
audit_login(request, success=False, desc=subject, error="验证码错误")
raise BizError("验证码错误")
user = find_login_user(db, phone=payload.phone, email=payload.email)
if user is None:
if not payload.auto_register:
audit_login(request, success=False, desc=subject, error="用户不存在")
raise BizError("用户不存在")
user = _create_user(
db,
username=f"用户_{subject}",
password=new_id(),
phone=payload.phone,
email=payload.email,
)
audit_login(
request,
actor=Actor(id=user.id, type="user"),
success=True,
desc=subject,
)
return ok(create_token_payload(user, request.app.state.config), message="登录成功")
@router.post("/logout")
def logout(principal=Depends(require_user)):
return ok(message="退出登录成功")
@router.post("/refresh")
def refresh(request: Request):
authorization = request.headers.get("Authorization", "")
prefix = "Bearer "
if not authorization.startswith(prefix):
raise Unauthorized("缺少令牌参数 Authorization Bearer")
payload = decode_token(
authorization[len(prefix) :],
request.app.state.config,
token_type="refresh",
)
subject = payload.get("sub")
if not subject:
raise Unauthorized("无效的令牌")
access_token = create_access_token(subject, request.app.state.config)
return ok({"accessToken": access_token, "tokenType": "Bearer"}, message="刷新令牌成功")
@router.post("/register")
def register(payload: RegisterIn, request: Request, db: Session = Depends(get_db)):
if not payload.password and not payload.code:
raise BizError("密码或验证码不能同时为空")
if payload.code:
subject = payload.phone or payload.email
if not subject or not check_code(subject, payload.code, VerificationCodeUsage.REGISTER):
raise BizError("验证码错误")
user = _create_user(
db,
username=payload.username or payload.phone or payload.email or f"user_{new_id()[:8]}",
password=payload.password or new_id(),
phone=payload.phone,
email=payload.email,
realname=payload.realname,
avatar=payload.avatar,
gender=payload.gender,
status=payload.status,
)
if payload.auto_login:
return ok(create_token_payload(user, request.app.state.config), message="注册成功")
return ok(dump_user(user), message="注册成功")
@router.post("/sendVerificationCode", dependencies=[limit("2 per minute")])
def send_verification_code(payload: SendVerificationCodeIn):
subject = payload.phone or payload.email
if not subject:
raise BizError("手机号或邮箱不能为空")
return ok(set_code(subject, payload.usage), message="发送验证码成功")
@router.get("/codes")
def get_user_permissions(principal=Depends(require_user)):
return ok(sorted(principal.permissions))
def _create_user(
db: Session,
*,
username: str,
password: str,
phone: str | None = None,
email: str | None = None,
realname: str | None = None,
avatar: str | None = None,
gender: str = GenderEnum.SECURE.value,
status: str = StatusEnum.ENABLED.value,
) -> User:
if find_login_user(db, username=username, phone=phone, email=email):
raise BizError("用户已存在")
user = User(
username=username,
phone=phone,
email=email,
realname=realname,
avatar=avatar,
gender=gender,
status=status,
roles=bind_roles(db, []),
depts=bind_depts(db, []),
)
user.set_password(password)
db.add(user)
db.commit()
db.refresh(user)
return user