diff --git a/app/services/mcp/router.py b/app/services/mcp/router.py deleted file mode 100644 index ef326cd..0000000 --- a/app/services/mcp/router.py +++ /dev/null @@ -1,152 +0,0 @@ -from fastmcp import FastMCP, Context -from typing import Optional, Dict, Any - -from ..postgresql.database import get_database_instance -from ..postgresql.scada_info import ScadaRepository -from ..postgresql.scheme import SchemeRepository - -# 创建 MCP 服务器实例 -mcp = FastMCP("TJWater PostgreSQL Service", description="访问水务系统 PostgreSQL 数据库操作") - - -# 数据库连接辅助函数 -async def get_database_connection(db_name: Optional[str] = None, ctx: Context = None): - """获取数据库连接,支持通过参数指定数据库名称""" - if ctx: - await ctx.info(f"连接到数据库: {db_name or '默认数据库'}") - - instance = await get_database_instance(db_name) - async with instance.get_connection() as conn: - yield conn - - -@mcp.tool -async def get_scada_info(db_name: Optional[str] = None, ctx: Context = None) -> Dict[str, Any]: - """ - 查询所有 SCADA 信息 - - Args: - db_name: 可选的数据库名称,为空时使用默认数据库 - ctx: MCP 上下文,用于日志记录 - """ - try: - if ctx: - await ctx.info("查询 SCADA 信息...") - - async for conn in get_database_connection(db_name, ctx): - scada_data = await ScadaRepository.get_scadas(conn) - - if ctx: - await ctx.info(f"检索到 {len(scada_data)} 条 SCADA 记录") - - return {"success": True, "data": scada_data, "count": len(scada_data)} - except Exception as e: - error_msg = f"查询 SCADA 信息时发生错误: {str(e)}" - if ctx: - await ctx.error(error_msg) - return {"success": False, "error": error_msg} - - -@mcp.tool -async def get_scheme_list(db_name: Optional[str] = None, ctx: Context = None) -> Dict[str, Any]: - """ - 查询所有方案信息 - - Args: - db_name: 可选的数据库名称,为空时使用默认数据库 - ctx: MCP 上下文,用于日志记录 - """ - try: - if ctx: - await ctx.info("查询方案信息...") - - async for conn in get_database_connection(db_name, ctx): - scheme_data = await SchemeRepository.get_schemes(conn) - - if ctx: - await ctx.info(f"检索到 {len(scheme_data)} 条方案记录") - - return {"success": True, "data": scheme_data, "count": len(scheme_data)} - except Exception as e: - error_msg = f"查询方案信息时发生错误: {str(e)}" - if ctx: - await ctx.error(error_msg) - return {"success": False, "error": error_msg} - - -@mcp.tool -async def get_burst_locate_results(db_name: Optional[str] = None, ctx: Context = None) -> Dict[str, Any]: - """ - 查询所有爆管定位结果 - - Args: - db_name: 可选的数据库名称,为空时使用默认数据库 - ctx: MCP 上下文,用于日志记录 - """ - try: - if ctx: - await ctx.info("查询爆管定位结果...") - - async for conn in get_database_connection(db_name, ctx): - burst_data = await SchemeRepository.get_burst_locate_results(conn) - - if ctx: - await ctx.info(f"检索到 {len(burst_data)} 条爆管记录") - - return {"success": True, "data": burst_data, "count": len(burst_data)} - except Exception as e: - error_msg = f"查询爆管定位结果时发生错误: {str(e)}" - if ctx: - await ctx.error(error_msg) - return {"success": False, "error": error_msg} - - -@mcp.tool -async def get_burst_locate_result_by_incident( - burst_incident: str, - db_name: Optional[str] = None, - ctx: Context = None -) -> Dict[str, Any]: - """ - 根据 burst_incident 查询爆管定位结果 - - Args: - burst_incident: 爆管事件标识符 - db_name: 可选的数据库名称,为空时使用默认数据库 - ctx: MCP 上下文,用于日志记录 - """ - try: - if ctx: - await ctx.info(f"查询爆管事件 {burst_incident} 的结果...") - - async for conn in get_database_connection(db_name, ctx): - result = await SchemeRepository.get_burst_locate_result_by_incident( - conn, burst_incident - ) - - if ctx: - await ctx.info("检索到爆管事件数据") - - return result - except Exception as e: - error_msg = f"根据 burst_incident 查询爆管定位结果时发生错误: {str(e)}" - if ctx: - await ctx.error(error_msg) - return {"success": False, "error": error_msg} - - -# 添加静态配置资源 -@mcp.resource("config://database/supported_databases") -def get_supported_databases(): - """列出支持的数据库配置""" - return ["default", "backup", "analytics"] - - -@mcp.resource("config://api/version") -def get_api_version(): - """获取 API 版本""" - return "1.0.0" - - -if __name__ == "__main__": - mcp.run() \ No newline at end of file