From 1712ecd4c781f3ae7f12d82a36c270ed3bd7ea7e Mon Sep 17 00:00:00 2001 From: Jiang Date: Tue, 9 Jun 2026 16:13:24 +0800 Subject: [PATCH] feat(api): add web search endpoint --- .env.example | 8 ++ app/api/v1/endpoints/web_search.py | 29 ++++++++ app/api/v1/router.py | 2 + app/core/config.py | 5 ++ app/services/__init__.py | 39 +--------- app/services/web_search.py | 93 +++++++++++++++++++++++ tests/unit/test_web_search.py | 115 +++++++++++++++++++++++++++++ 7 files changed, 256 insertions(+), 35 deletions(-) create mode 100644 app/api/v1/endpoints/web_search.py create mode 100644 app/services/web_search.py create mode 100644 tests/unit/test_web_search.py diff --git a/.env.example b/.env.example index dd9267b..54c973d 100644 --- a/.env.example +++ b/.env.example @@ -48,3 +48,11 @@ METADATA_DB_PASSWORD="password" KEYCLOAK_PUBLIC_KEY="-----BEGIN PUBLIC KEY-----\n...\n-----END PUBLIC KEY-----" KEYCLOAK_ALGORITHM=RS256 KEYCLOAK_AUDIENCE="account" + + +# ============================================ +# Bocha Web Search API +# ============================================ +BOCHA_API_KEY="sk-your-bocha-api-key" +BOCHA_WEB_SEARCH_URL="https://api.bochaai.com/v1/web-search" +BOCHA_WEB_SEARCH_TIMEOUT_SECONDS=30 diff --git a/app/api/v1/endpoints/web_search.py b/app/api/v1/endpoints/web_search.py new file mode 100644 index 0000000..d3e2675 --- /dev/null +++ b/app/api/v1/endpoints/web_search.py @@ -0,0 +1,29 @@ +from typing import Any + +from fastapi import APIRouter, HTTPException, status + +from app.services.web_search import ( + BochaSearchAPIError, + BochaSearchConfigError, + WebSearchRequest, + search_bocha_web, +) + +router = APIRouter() + + +@router.post( + "/web-search", + summary="Web Search", + description="调用 Bocha Web Search API 获取实时网页搜索结果", +) +async def web_search(request: WebSearchRequest) -> dict[str, Any]: + try: + return await search_bocha_web(request) + except BochaSearchConfigError as exc: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=str(exc), + ) from exc + except BochaSearchAPIError as exc: + raise HTTPException(status_code=exc.status_code, detail=exc.detail) from exc diff --git a/app/api/v1/router.py b/app/api/v1/router.py index f0c286e..4e018cf 100644 --- a/app/api/v1/router.py +++ b/app/api/v1/router.py @@ -18,6 +18,7 @@ from app.api.v1.endpoints import ( user_management, # 新增:用户管理 audit, # 新增:审计日志 meta, + web_search, ) from app.api.v1.endpoints.network import ( general, @@ -93,6 +94,7 @@ api_router.include_router(schemes.router, tags=["Schemes"]) api_router.include_router(misc.router, tags=["Misc"]) api_router.include_router(risk.router, tags=["Risk"]) api_router.include_router(cache.router, tags=["Cache"]) +api_router.include_router(web_search.router, tags=["Web Search"]) api_router.include_router(leakage.router, prefix="/leakage", tags=["Leakage"]) api_router.include_router( burst_detection.router, prefix="/burst-detection", tags=["Burst Detection"] diff --git a/app/core/config.py b/app/core/config.py index 3975143..49ccdaf 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -64,6 +64,11 @@ class Settings(BaseSettings): KEYCLOAK_ALGORITHM: str = "RS256" KEYCLOAK_AUDIENCE: str = "" + # Bocha Web Search API + BOCHA_API_KEY: str = "" + BOCHA_WEB_SEARCH_URL: str = "https://api.bochaai.com/v1/web-search" + BOCHA_WEB_SEARCH_TIMEOUT_SECONDS: float = 30.0 + @property def SQLALCHEMY_DATABASE_URI(self) -> str: db_password = quote_plus(self.DB_PASSWORD) diff --git a/app/services/__init__.py b/app/services/__init__.py index 645a38a..1c6f317 100644 --- a/app/services/__init__.py +++ b/app/services/__init__.py @@ -1,36 +1,5 @@ -from app.services.network_import import network_update, submit_scada_info -from app.services.scheme_management import ( - create_user, - delete_user, - scheme_name_exists, - store_scheme_info, - delete_scheme_info, - query_scheme_list, - upload_shp_to_pg, - submit_risk_probability_result, -) -from app.services.valve_isolation import analyze_valve_isolation -from app.services.simulation_ops import ( - project_management, - scheduling_simulation, - daily_scheduling_simulation, -) -from app.services.leakage_identifier import run_leakage_identification +"""Service package. -__all__ = [ - "network_update", - "submit_scada_info", - "create_user", - "delete_user", - "scheme_name_exists", - "store_scheme_info", - "delete_scheme_info", - "query_scheme_list", - "upload_shp_to_pg", - "submit_risk_probability_result", - "project_management", - "scheduling_simulation", - "daily_scheduling_simulation", - "analyze_valve_isolation", - "run_leakage_identification", -] +Keep package initialization lightweight. Import concrete service modules directly, +for example: `from app.services.tjnetwork import open_project`. +""" diff --git a/app/services/web_search.py b/app/services/web_search.py new file mode 100644 index 0000000..dc98efa --- /dev/null +++ b/app/services/web_search.py @@ -0,0 +1,93 @@ +from typing import Any, Literal + +import httpx +from pydantic import BaseModel, Field + +from app.core.config import settings + + +Freshness = Literal["noLimit", "oneDay", "oneWeek", "oneMonth", "oneYear"] + + +class WebSearchRequest(BaseModel): + query: str = Field(..., min_length=1, description="搜索关键词") + freshness: Freshness | str = Field( + default="noLimit", + description="时间范围:noLimit、oneDay、oneWeek、oneMonth、oneYear 或日期范围", + ) + summary: bool = Field(default=True, description="是否返回网页摘要") + count: int = Field(default=10, ge=1, le=50, description="返回结果数量") + include: list[str] | None = Field(default=None, description="限定搜索域名") + exclude: list[str] | None = Field(default=None, description="排除搜索域名") + + +class BochaSearchConfigError(RuntimeError): + pass + + +class BochaSearchAPIError(RuntimeError): + def __init__(self, status_code: int, detail: Any): + super().__init__("Bocha Web Search API request failed") + self.status_code = status_code + self.detail = detail + + +def _build_payload(request: WebSearchRequest) -> dict[str, Any]: + payload = request.model_dump(exclude_none=True) + if request.include: + payload["include"] = ",".join(request.include) + if request.exclude: + payload["exclude"] = ",".join(request.exclude) + return payload + + +async def search_bocha_web( + request: WebSearchRequest, + *, + client: httpx.AsyncClient | None = None, +) -> dict[str, Any]: + if not settings.BOCHA_API_KEY: + raise BochaSearchConfigError("BOCHA_API_KEY is not configured") + + headers = { + "Authorization": f"Bearer {settings.BOCHA_API_KEY}", + "Content-Type": "application/json", + } + payload = _build_payload(request) + + if client is not None: + response = await client.post( + settings.BOCHA_WEB_SEARCH_URL, + headers=headers, + json=payload, + ) + return _parse_response(response) + + async with httpx.AsyncClient( + timeout=settings.BOCHA_WEB_SEARCH_TIMEOUT_SECONDS + ) as managed_client: + response = await managed_client.post( + settings.BOCHA_WEB_SEARCH_URL, + headers=headers, + json=payload, + ) + return _parse_response(response) + + +def _parse_response(response: httpx.Response) -> dict[str, Any]: + try: + response.raise_for_status() + except httpx.HTTPStatusError as exc: + raise BochaSearchAPIError( + exc.response.status_code, + _response_detail(exc.response), + ) from exc + + return response.json() + + +def _response_detail(response: httpx.Response) -> Any: + try: + return response.json() + except ValueError: + return response.text diff --git a/tests/unit/test_web_search.py b/tests/unit/test_web_search.py new file mode 100644 index 0000000..f1f2c4a --- /dev/null +++ b/tests/unit/test_web_search.py @@ -0,0 +1,115 @@ +import asyncio +import importlib.util +from pathlib import Path + +import httpx +import pytest + + +def _load_web_search_module(): + module_path = ( + Path(__file__).resolve().parents[2] / "app" / "services" / "web_search.py" + ) + spec = importlib.util.spec_from_file_location("tests_web_search_under_test", module_path) + module = importlib.util.module_from_spec(spec) + assert spec and spec.loader + spec.loader.exec_module(module) + return module + + +web_search = _load_web_search_module() + + +class FakeClient: + def __init__(self, response): + self.response = response + self.calls = [] + + async def post(self, url, *, headers, json): + self.calls.append({"url": url, "headers": headers, "json": json}) + return self.response + + +def test_search_bocha_web_posts_expected_payload(monkeypatch): + monkeypatch.setattr(web_search.settings, "BOCHA_API_KEY", "sk-test") + monkeypatch.setattr( + web_search.settings, + "BOCHA_WEB_SEARCH_URL", + "https://api.bochaai.com/v1/web-search", + ) + response = httpx.Response( + 200, + json={"data": {"webPages": {"value": []}}}, + request=httpx.Request("POST", "https://api.bochaai.com/v1/web-search"), + ) + client = FakeClient(response) + + result = asyncio.run( + web_search.search_bocha_web( + web_search.WebSearchRequest( + query="天津水务", + freshness="oneWeek", + summary=True, + count=5, + include=["example.com", "news.example.com"], + exclude=["spam.example.com"], + ), + client=client, + ) + ) + + assert result == {"data": {"webPages": {"value": []}}} + assert client.calls == [ + { + "url": "https://api.bochaai.com/v1/web-search", + "headers": { + "Authorization": "Bearer sk-test", + "Content-Type": "application/json", + }, + "json": { + "query": "天津水务", + "freshness": "oneWeek", + "summary": True, + "count": 5, + "include": "example.com,news.example.com", + "exclude": "spam.example.com", + }, + } + ] + + +def test_search_bocha_web_requires_api_key(monkeypatch): + monkeypatch.setattr(web_search.settings, "BOCHA_API_KEY", "") + + with pytest.raises(web_search.BochaSearchConfigError): + asyncio.run( + web_search.search_bocha_web( + web_search.WebSearchRequest(query="天津水务"), + client=FakeClient(httpx.Response(200, json={})), + ) + ) + + +def test_search_bocha_web_surfaces_upstream_error(monkeypatch): + monkeypatch.setattr(web_search.settings, "BOCHA_API_KEY", "sk-test") + response = httpx.Response( + 401, + json={"error": "invalid api key"}, + request=httpx.Request("POST", "https://api.bochaai.com/v1/web-search"), + ) + + with pytest.raises(web_search.BochaSearchAPIError) as exc_info: + asyncio.run( + web_search.search_bocha_web( + web_search.WebSearchRequest(query="天津水务"), + client=FakeClient(response), + ) + ) + + assert exc_info.value.status_code == 401 + assert exc_info.value.detail == {"error": "invalid api key"} + + +def test_web_search_request_validates_count_range(): + with pytest.raises(ValueError): + web_search.WebSearchRequest(query="天津水务", count=51)