feat(api): add web search endpoint

This commit is contained in:
2026-06-09 16:13:24 +08:00
parent 441979f581
commit 1712ecd4c7
7 changed files with 256 additions and 35 deletions
+8
View File
@@ -48,3 +48,11 @@ METADATA_DB_PASSWORD="password"
KEYCLOAK_PUBLIC_KEY="-----BEGIN PUBLIC KEY-----\n...\n-----END PUBLIC KEY-----"
KEYCLOAK_ALGORITHM=RS256
KEYCLOAK_AUDIENCE="account"
# ============================================
# Bocha Web Search API
# ============================================
BOCHA_API_KEY="sk-your-bocha-api-key"
BOCHA_WEB_SEARCH_URL="https://api.bochaai.com/v1/web-search"
BOCHA_WEB_SEARCH_TIMEOUT_SECONDS=30
+29
View File
@@ -0,0 +1,29 @@
from typing import Any
from fastapi import APIRouter, HTTPException, status
from app.services.web_search import (
BochaSearchAPIError,
BochaSearchConfigError,
WebSearchRequest,
search_bocha_web,
)
router = APIRouter()
@router.post(
"/web-search",
summary="Web Search",
description="调用 Bocha Web Search API 获取实时网页搜索结果",
)
async def web_search(request: WebSearchRequest) -> dict[str, Any]:
try:
return await search_bocha_web(request)
except BochaSearchConfigError as exc:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=str(exc),
) from exc
except BochaSearchAPIError as exc:
raise HTTPException(status_code=exc.status_code, detail=exc.detail) from exc
+2
View File
@@ -18,6 +18,7 @@ from app.api.v1.endpoints import (
user_management, # 新增:用户管理
audit, # 新增:审计日志
meta,
web_search,
)
from app.api.v1.endpoints.network import (
general,
@@ -93,6 +94,7 @@ api_router.include_router(schemes.router, tags=["Schemes"])
api_router.include_router(misc.router, tags=["Misc"])
api_router.include_router(risk.router, tags=["Risk"])
api_router.include_router(cache.router, tags=["Cache"])
api_router.include_router(web_search.router, tags=["Web Search"])
api_router.include_router(leakage.router, prefix="/leakage", tags=["Leakage"])
api_router.include_router(
burst_detection.router, prefix="/burst-detection", tags=["Burst Detection"]
+5
View File
@@ -64,6 +64,11 @@ class Settings(BaseSettings):
KEYCLOAK_ALGORITHM: str = "RS256"
KEYCLOAK_AUDIENCE: str = ""
# Bocha Web Search API
BOCHA_API_KEY: str = ""
BOCHA_WEB_SEARCH_URL: str = "https://api.bochaai.com/v1/web-search"
BOCHA_WEB_SEARCH_TIMEOUT_SECONDS: float = 30.0
@property
def SQLALCHEMY_DATABASE_URI(self) -> str:
db_password = quote_plus(self.DB_PASSWORD)
+4 -35
View File
@@ -1,36 +1,5 @@
from app.services.network_import import network_update, submit_scada_info
from app.services.scheme_management import (
create_user,
delete_user,
scheme_name_exists,
store_scheme_info,
delete_scheme_info,
query_scheme_list,
upload_shp_to_pg,
submit_risk_probability_result,
)
from app.services.valve_isolation import analyze_valve_isolation
from app.services.simulation_ops import (
project_management,
scheduling_simulation,
daily_scheduling_simulation,
)
from app.services.leakage_identifier import run_leakage_identification
"""Service package.
__all__ = [
"network_update",
"submit_scada_info",
"create_user",
"delete_user",
"scheme_name_exists",
"store_scheme_info",
"delete_scheme_info",
"query_scheme_list",
"upload_shp_to_pg",
"submit_risk_probability_result",
"project_management",
"scheduling_simulation",
"daily_scheduling_simulation",
"analyze_valve_isolation",
"run_leakage_identification",
]
Keep package initialization lightweight. Import concrete service modules directly,
for example: `from app.services.tjnetwork import open_project`.
"""
+93
View File
@@ -0,0 +1,93 @@
from typing import Any, Literal
import httpx
from pydantic import BaseModel, Field
from app.core.config import settings
Freshness = Literal["noLimit", "oneDay", "oneWeek", "oneMonth", "oneYear"]
class WebSearchRequest(BaseModel):
query: str = Field(..., min_length=1, description="搜索关键词")
freshness: Freshness | str = Field(
default="noLimit",
description="时间范围:noLimit、oneDay、oneWeek、oneMonth、oneYear 或日期范围",
)
summary: bool = Field(default=True, description="是否返回网页摘要")
count: int = Field(default=10, ge=1, le=50, description="返回结果数量")
include: list[str] | None = Field(default=None, description="限定搜索域名")
exclude: list[str] | None = Field(default=None, description="排除搜索域名")
class BochaSearchConfigError(RuntimeError):
pass
class BochaSearchAPIError(RuntimeError):
def __init__(self, status_code: int, detail: Any):
super().__init__("Bocha Web Search API request failed")
self.status_code = status_code
self.detail = detail
def _build_payload(request: WebSearchRequest) -> dict[str, Any]:
payload = request.model_dump(exclude_none=True)
if request.include:
payload["include"] = ",".join(request.include)
if request.exclude:
payload["exclude"] = ",".join(request.exclude)
return payload
async def search_bocha_web(
request: WebSearchRequest,
*,
client: httpx.AsyncClient | None = None,
) -> dict[str, Any]:
if not settings.BOCHA_API_KEY:
raise BochaSearchConfigError("BOCHA_API_KEY is not configured")
headers = {
"Authorization": f"Bearer {settings.BOCHA_API_KEY}",
"Content-Type": "application/json",
}
payload = _build_payload(request)
if client is not None:
response = await client.post(
settings.BOCHA_WEB_SEARCH_URL,
headers=headers,
json=payload,
)
return _parse_response(response)
async with httpx.AsyncClient(
timeout=settings.BOCHA_WEB_SEARCH_TIMEOUT_SECONDS
) as managed_client:
response = await managed_client.post(
settings.BOCHA_WEB_SEARCH_URL,
headers=headers,
json=payload,
)
return _parse_response(response)
def _parse_response(response: httpx.Response) -> dict[str, Any]:
try:
response.raise_for_status()
except httpx.HTTPStatusError as exc:
raise BochaSearchAPIError(
exc.response.status_code,
_response_detail(exc.response),
) from exc
return response.json()
def _response_detail(response: httpx.Response) -> Any:
try:
return response.json()
except ValueError:
return response.text
+115
View File
@@ -0,0 +1,115 @@
import asyncio
import importlib.util
from pathlib import Path
import httpx
import pytest
def _load_web_search_module():
module_path = (
Path(__file__).resolve().parents[2] / "app" / "services" / "web_search.py"
)
spec = importlib.util.spec_from_file_location("tests_web_search_under_test", module_path)
module = importlib.util.module_from_spec(spec)
assert spec and spec.loader
spec.loader.exec_module(module)
return module
web_search = _load_web_search_module()
class FakeClient:
def __init__(self, response):
self.response = response
self.calls = []
async def post(self, url, *, headers, json):
self.calls.append({"url": url, "headers": headers, "json": json})
return self.response
def test_search_bocha_web_posts_expected_payload(monkeypatch):
monkeypatch.setattr(web_search.settings, "BOCHA_API_KEY", "sk-test")
monkeypatch.setattr(
web_search.settings,
"BOCHA_WEB_SEARCH_URL",
"https://api.bochaai.com/v1/web-search",
)
response = httpx.Response(
200,
json={"data": {"webPages": {"value": []}}},
request=httpx.Request("POST", "https://api.bochaai.com/v1/web-search"),
)
client = FakeClient(response)
result = asyncio.run(
web_search.search_bocha_web(
web_search.WebSearchRequest(
query="天津水务",
freshness="oneWeek",
summary=True,
count=5,
include=["example.com", "news.example.com"],
exclude=["spam.example.com"],
),
client=client,
)
)
assert result == {"data": {"webPages": {"value": []}}}
assert client.calls == [
{
"url": "https://api.bochaai.com/v1/web-search",
"headers": {
"Authorization": "Bearer sk-test",
"Content-Type": "application/json",
},
"json": {
"query": "天津水务",
"freshness": "oneWeek",
"summary": True,
"count": 5,
"include": "example.com,news.example.com",
"exclude": "spam.example.com",
},
}
]
def test_search_bocha_web_requires_api_key(monkeypatch):
monkeypatch.setattr(web_search.settings, "BOCHA_API_KEY", "")
with pytest.raises(web_search.BochaSearchConfigError):
asyncio.run(
web_search.search_bocha_web(
web_search.WebSearchRequest(query="天津水务"),
client=FakeClient(httpx.Response(200, json={})),
)
)
def test_search_bocha_web_surfaces_upstream_error(monkeypatch):
monkeypatch.setattr(web_search.settings, "BOCHA_API_KEY", "sk-test")
response = httpx.Response(
401,
json={"error": "invalid api key"},
request=httpx.Request("POST", "https://api.bochaai.com/v1/web-search"),
)
with pytest.raises(web_search.BochaSearchAPIError) as exc_info:
asyncio.run(
web_search.search_bocha_web(
web_search.WebSearchRequest(query="天津水务"),
client=FakeClient(response),
)
)
assert exc_info.value.status_code == 401
assert exc_info.value.detail == {"error": "invalid api key"}
def test_web_search_request_validates_count_range():
with pytest.raises(ValueError):
web_search.WebSearchRequest(query="天津水务", count=51)