from __future__ import annotations from fastapi import APIRouter, Depends, Request from sqlalchemy.orm import Session from iti.audit import audit_login from iti.auth import Actor from iti.auth import create_access_token, decode_token, require_user from iti.db import get_db from iti.exceptions import BizError, Unauthorized from iti.limiter import limit from iti.responses import ok from iti_system.enums import GenderEnum, StatusEnum from iti_system.models import User from iti_system.schemas import CodeLogin, PasswordLogin, RegisterIn, SendVerificationCodeIn from iti_system.services import ( bind_depts, bind_roles, create_token_payload, dump_user, find_login_user, new_id, ) from iti_system.verification import VerificationCodeUsage, check_code, set_code router = APIRouter(prefix="/auth", tags=["system.auth"]) @router.post("/loginByPassword") def login_by_password(payload: PasswordLogin, request: Request, db: Session = Depends(get_db)): user = find_login_user(db, username=payload.username, phone=payload.phone, email=payload.email) if user is None: if not payload.auto_register: audit_login( request, success=False, desc=payload.username or payload.phone or payload.email, error="用户不存在", ) raise BizError("用户不存在") user = _create_user( db, username=payload.username or payload.phone or payload.email or f"user_{new_id()[:8]}", password=payload.password, phone=payload.phone, email=payload.email, ) if not user.check_password(payload.password): audit_login( request, actor=Actor(id=user.id, type="user"), success=False, desc=user.username, error="密码错误", ) raise BizError("密码错误") audit_login( request, actor=Actor(id=user.id, type="user"), success=True, desc=user.username, ) return ok(create_token_payload(user, request.app.state.config), message="登录成功") @router.post("/loginByCode") def login_by_code(payload: CodeLogin, request: Request, db: Session = Depends(get_db)): subject = payload.phone or payload.email if not subject: raise BizError("手机号或邮箱不能为空") if not check_code(subject, payload.code, VerificationCodeUsage.LOGIN): audit_login(request, success=False, desc=subject, error="验证码错误") raise BizError("验证码错误") user = find_login_user(db, phone=payload.phone, email=payload.email) if user is None: if not payload.auto_register: audit_login(request, success=False, desc=subject, error="用户不存在") raise BizError("用户不存在") user = _create_user( db, username=f"用户_{subject}", password=new_id(), phone=payload.phone, email=payload.email, ) audit_login( request, actor=Actor(id=user.id, type="user"), success=True, desc=subject, ) return ok(create_token_payload(user, request.app.state.config), message="登录成功") @router.post("/logout") def logout(principal=Depends(require_user)): return ok(message="退出登录成功") @router.post("/refresh") def refresh(request: Request): authorization = request.headers.get("Authorization", "") prefix = "Bearer " if not authorization.startswith(prefix): raise Unauthorized("缺少令牌参数 Authorization Bearer") payload = decode_token( authorization[len(prefix) :], request.app.state.config, token_type="refresh", ) subject = payload.get("sub") if not subject: raise Unauthorized("无效的令牌") access_token = create_access_token(subject, request.app.state.config) return ok({"accessToken": access_token, "tokenType": "Bearer"}, message="刷新令牌成功") @router.post("/register") def register(payload: RegisterIn, request: Request, db: Session = Depends(get_db)): if not payload.password and not payload.code: raise BizError("密码或验证码不能同时为空") if payload.code: subject = payload.phone or payload.email if not subject or not check_code(subject, payload.code, VerificationCodeUsage.REGISTER): raise BizError("验证码错误") user = _create_user( db, username=payload.username or payload.phone or payload.email or f"user_{new_id()[:8]}", password=payload.password or new_id(), phone=payload.phone, email=payload.email, realname=payload.realname, avatar=payload.avatar, gender=payload.gender, status=payload.status, ) if payload.auto_login: return ok(create_token_payload(user, request.app.state.config), message="注册成功") return ok(dump_user(user), message="注册成功") @router.post("/sendVerificationCode", dependencies=[limit("2 per minute")]) def send_verification_code(payload: SendVerificationCodeIn): subject = payload.phone or payload.email if not subject: raise BizError("手机号或邮箱不能为空") return ok(set_code(subject, payload.usage), message="发送验证码成功") @router.get("/codes") def get_user_permissions(principal=Depends(require_user)): return ok(sorted(principal.permissions)) def _create_user( db: Session, *, username: str, password: str, phone: str | None = None, email: str | None = None, realname: str | None = None, avatar: str | None = None, gender: str = GenderEnum.SECURE.value, status: str = StatusEnum.ENABLED.value, ) -> User: if find_login_user(db, username=username, phone=phone, email=email): raise BizError("用户已存在") user = User( username=username, phone=phone, email=email, realname=realname, avatar=avatar, gender=gender, status=status, roles=bind_roles(db, []), depts=bind_depts(db, []), ) user.set_password(password) db.add(user) db.commit() db.refresh(user) return user