94 lines
2.7 KiB
Python
94 lines
2.7 KiB
Python
from typing import Any, Literal
|
|
|
|
import httpx
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app.core.config import settings
|
|
|
|
|
|
Freshness = Literal["noLimit", "oneDay", "oneWeek", "oneMonth", "oneYear"]
|
|
|
|
|
|
class WebSearchRequest(BaseModel):
|
|
query: str = Field(..., min_length=1, description="搜索关键词")
|
|
freshness: Freshness | str = Field(
|
|
default="noLimit",
|
|
description="时间范围:noLimit、oneDay、oneWeek、oneMonth、oneYear 或日期范围",
|
|
)
|
|
summary: bool = Field(default=True, description="是否返回网页摘要")
|
|
count: int = Field(default=10, ge=1, le=50, description="返回结果数量")
|
|
include: list[str] | None = Field(default=None, description="限定搜索域名")
|
|
exclude: list[str] | None = Field(default=None, description="排除搜索域名")
|
|
|
|
|
|
class BochaSearchConfigError(RuntimeError):
|
|
pass
|
|
|
|
|
|
class BochaSearchAPIError(RuntimeError):
|
|
def __init__(self, status_code: int, detail: Any):
|
|
super().__init__("Bocha Web Search API request failed")
|
|
self.status_code = status_code
|
|
self.detail = detail
|
|
|
|
|
|
def _build_payload(request: WebSearchRequest) -> dict[str, Any]:
|
|
payload = request.model_dump(exclude_none=True)
|
|
if request.include:
|
|
payload["include"] = ",".join(request.include)
|
|
if request.exclude:
|
|
payload["exclude"] = ",".join(request.exclude)
|
|
return payload
|
|
|
|
|
|
async def search_bocha_web(
|
|
request: WebSearchRequest,
|
|
*,
|
|
client: httpx.AsyncClient | None = None,
|
|
) -> dict[str, Any]:
|
|
if not settings.BOCHA_API_KEY:
|
|
raise BochaSearchConfigError("BOCHA_API_KEY is not configured")
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {settings.BOCHA_API_KEY}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
payload = _build_payload(request)
|
|
|
|
if client is not None:
|
|
response = await client.post(
|
|
settings.BOCHA_WEB_SEARCH_URL,
|
|
headers=headers,
|
|
json=payload,
|
|
)
|
|
return _parse_response(response)
|
|
|
|
async with httpx.AsyncClient(
|
|
timeout=settings.BOCHA_WEB_SEARCH_TIMEOUT_SECONDS
|
|
) as managed_client:
|
|
response = await managed_client.post(
|
|
settings.BOCHA_WEB_SEARCH_URL,
|
|
headers=headers,
|
|
json=payload,
|
|
)
|
|
return _parse_response(response)
|
|
|
|
|
|
def _parse_response(response: httpx.Response) -> dict[str, Any]:
|
|
try:
|
|
response.raise_for_status()
|
|
except httpx.HTTPStatusError as exc:
|
|
raise BochaSearchAPIError(
|
|
exc.response.status_code,
|
|
_response_detail(exc.response),
|
|
) from exc
|
|
|
|
return response.json()
|
|
|
|
|
|
def _response_detail(response: httpx.Response) -> Any:
|
|
try:
|
|
return response.json()
|
|
except ValueError:
|
|
return response.text
|