You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
67 lines
1.7 KiB
Python
67 lines
1.7 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
from jose import JWTError, jwt
|
|
|
|
from iti.config import BaseConfig
|
|
from iti.exceptions import Unauthorized
|
|
|
|
|
|
def _create_token(
|
|
subject: str,
|
|
config: BaseConfig,
|
|
*,
|
|
token_type: str,
|
|
expires_seconds: int,
|
|
claims: dict[str, Any] | None = None,
|
|
) -> str:
|
|
now = datetime.now(timezone.utc)
|
|
payload = {
|
|
"sub": subject,
|
|
"type": token_type,
|
|
"iat": int(now.timestamp()),
|
|
"exp": int((now + timedelta(seconds=expires_seconds)).timestamp()),
|
|
**(claims or {}),
|
|
}
|
|
return jwt.encode(payload, config.jwt_secret_key, algorithm=config.jwt_algorithm)
|
|
|
|
|
|
def create_access_token(
|
|
subject: str,
|
|
config: BaseConfig,
|
|
claims: dict[str, Any] | None = None,
|
|
) -> str:
|
|
return _create_token(
|
|
subject,
|
|
config,
|
|
token_type="access",
|
|
expires_seconds=config.jwt_access_token_expires_seconds,
|
|
claims=claims,
|
|
)
|
|
|
|
|
|
def create_refresh_token(
|
|
subject: str,
|
|
config: BaseConfig,
|
|
claims: dict[str, Any] | None = None,
|
|
) -> str:
|
|
return _create_token(
|
|
subject,
|
|
config,
|
|
token_type="refresh",
|
|
expires_seconds=config.jwt_refresh_token_expires_seconds,
|
|
claims=claims,
|
|
)
|
|
|
|
|
|
def decode_token(token: str, config: BaseConfig, *, token_type: str | None = None) -> dict:
|
|
try:
|
|
payload = jwt.decode(token, config.jwt_secret_key, algorithms=[config.jwt_algorithm])
|
|
except JWTError as exc:
|
|
raise Unauthorized("无效的令牌") from exc
|
|
if token_type is not None and payload.get("type") != token_type:
|
|
raise Unauthorized("令牌类型错误")
|
|
return payload
|