from cryptography.fernet import Fernet from typing import Optional import base64 import os from app.core.config import settings class Encryptor: """ 使用 Fernet (对称加密) 实现数据加密/解密 适用于加密敏感配置、用户数据等 """ def __init__(self, key: Optional[bytes] = None): """ 初始化加密器 Args: key: 加密密钥,如果为 None 则从环境变量读取 """ if key is None: key_str = os.getenv("ENCRYPTION_KEY") or settings.ENCRYPTION_KEY if not key_str: raise ValueError( "ENCRYPTION_KEY not found in environment variables or .env. " "Generate one using: Encryptor.generate_key()" ) key = key_str.encode() self.fernet = Fernet(key) def encrypt(self, data: str) -> str: """ 加密字符串 Args: data: 待加密的明文字符串 Returns: Base64 编码的加密字符串 """ if not data: return data encrypted_bytes = self.fernet.encrypt(data.encode()) return encrypted_bytes.decode() def decrypt(self, data: str) -> str: """ 解密字符串 Args: data: Base64 编码的加密字符串 Returns: 解密后的明文字符串 """ if not data: return data decrypted_bytes = self.fernet.decrypt(data.encode()) return decrypted_bytes.decode() @staticmethod def generate_key() -> str: """ 生成新的 Fernet 加密密钥 Returns: Base64 编码的密钥字符串 """ key = Fernet.generate_key() return key.decode() # 全局加密器实例(懒加载) _encryptor: Optional[Encryptor] = None _database_encryptor: Optional[Encryptor] = None def is_encryption_configured() -> bool: return bool(os.getenv("ENCRYPTION_KEY") or settings.ENCRYPTION_KEY) def is_database_encryption_configured() -> bool: return bool( os.getenv("DATABASE_ENCRYPTION_KEY") or settings.DATABASE_ENCRYPTION_KEY or os.getenv("ENCRYPTION_KEY") or settings.ENCRYPTION_KEY ) def get_encryptor() -> Encryptor: """获取全局加密器实例""" global _encryptor if _encryptor is None: _encryptor = Encryptor() return _encryptor def get_database_encryptor() -> Encryptor: """获取 project DB DSN 专用加密器实例""" global _database_encryptor if _database_encryptor is None: key_str = ( os.getenv("DATABASE_ENCRYPTION_KEY") or settings.DATABASE_ENCRYPTION_KEY or os.getenv("ENCRYPTION_KEY") or settings.ENCRYPTION_KEY ) if not key_str: raise ValueError( "DATABASE_ENCRYPTION_KEY not found in environment variables or .env. " "Generate one using: Encryptor.generate_key()" ) _database_encryptor = Encryptor(key=key_str.encode()) return _database_encryptor # 向后兼容(延迟加载) def __getattr__(name): if name == "encryptor": return get_encryptor() raise AttributeError(f"module '{__name__}' has no attribute '{name}'")