from datetime import datetime, timedelta from typing import Optional, Union, Any from jose import jwt from passlib.context import CryptContext from app.core.config import settings pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def create_access_token( subject: Union[str, Any], expires_delta: Optional[timedelta] = None ) -> str: """ 创建 JWT Access Token Args: subject: 用户标识(通常是用户名或用户ID) expires_delta: 过期时间增量 Returns: JWT token 字符串 """ if expires_delta: expire = datetime.now() + expires_delta else: expire = datetime.now() + timedelta( minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES ) to_encode = { "exp": expire, "sub": str(subject), "type": "access", "iat": datetime.now(), } encoded_jwt = jwt.encode( to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM ) return encoded_jwt def create_refresh_token(subject: Union[str, Any]) -> str: """ 创建 JWT Refresh Token(长期有效) Args: subject: 用户标识 Returns: JWT refresh token 字符串 """ expire = datetime.now() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS) to_encode = { "exp": expire, "sub": str(subject), "type": "refresh", "iat": datetime.now(), } encoded_jwt = jwt.encode( to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM ) return encoded_jwt def verify_password(plain_password: str, hashed_password: str) -> bool: """ 验证密码 Args: plain_password: 明文密码 hashed_password: 密码哈希 Returns: 是否匹配 """ return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """ 生成密码哈希 Args: password: 明文密码 Returns: bcrypt 哈希字符串 """ return pwd_context.hash(password)