from __future__ import annotations from datetime import datetime, timedelta, timezone from typing import Any from jose import JWTError, jwt from iti.config import BaseConfig from iti.exceptions import Unauthorized def _create_token( subject: str, config: BaseConfig, *, token_type: str, expires_seconds: int, claims: dict[str, Any] | None = None, ) -> str: now = datetime.now(timezone.utc) payload = { "sub": subject, "type": token_type, "iat": int(now.timestamp()), "exp": int((now + timedelta(seconds=expires_seconds)).timestamp()), **(claims or {}), } return jwt.encode(payload, config.jwt_secret_key, algorithm=config.jwt_algorithm) def create_access_token( subject: str, config: BaseConfig, claims: dict[str, Any] | None = None, ) -> str: return _create_token( subject, config, token_type="access", expires_seconds=config.jwt_access_token_expires_seconds, claims=claims, ) def create_refresh_token( subject: str, config: BaseConfig, claims: dict[str, Any] | None = None, ) -> str: return _create_token( subject, config, token_type="refresh", expires_seconds=config.jwt_refresh_token_expires_seconds, claims=claims, ) def decode_token(token: str, config: BaseConfig, *, token_type: str | None = None) -> dict: try: payload = jwt.decode(token, config.jwt_secret_key, algorithms=[config.jwt_algorithm]) except JWTError as exc: raise Unauthorized("无效的令牌") from exc if token_type is not None and payload.get("type") != token_type: raise Unauthorized("令牌类型错误") return payload