新增监测点优化方案创建API

This commit is contained in:
JIANG
2025-11-03 16:25:06 +08:00
parent 1ad3535924
commit cf6d388155

64
main.py
View File

@@ -3993,7 +3993,7 @@ async def fastapi_pump_failure(data: PumpFailureState) -> str:
# pressure_sensor_placement_sensitivity api 50
############################################################
# 2025/05/17
class Pressure_Sensor_Placement_Sensitivity(BaseModel):
class Pressure_Sensor_Placement(BaseModel):
name: str
scheme_name: str
sensor_number: int
@@ -4003,7 +4003,7 @@ class Pressure_Sensor_Placement_Sensitivity(BaseModel):
@app.post("/pressure_sensor_placement_sensitivity/")
async def fastapi_pressure_sensor_placement_sensitivity(
data: Pressure_Sensor_Placement_Sensitivity,
data: Pressure_Sensor_Placement,
) -> None:
item = data.dict()
pressure_sensor_placement_sensitivity(
@@ -4015,6 +4015,66 @@ async def fastapi_pressure_sensor_placement_sensitivity(
)
@app.post("/pressure_sensor_placement_kmeans/")
async def fastapi_pressure_sensor_placement_kmeans(
data: Pressure_Sensor_Placement,
) -> None:
item = data.dict()
pressure_sensor_placement_kmeans(
name=item["name"],
scheme_name=item["scheme_name"],
sensor_number=item["sensor_number"],
min_diameter=item["min_diameter"],
username=item["username"],
)
# 后续改进合并两个接口为一个增加method、sensor_type参数选择方法
class Pressure_Sensor_Placement_Request(BaseModel):
network: str
scheme_name: str
sensor_type: str
method: str # "sensitivity" 或 "kmeans"
sensor_count: int
min_diameter: int = 0
username: str
@app.post("/sensorplacementscheme/create")
async def fastapi_pressure_sensor_placement(
data: Pressure_Sensor_Placement_Request,
) -> dict[str, str]:
item = data.dict()
# 验证方法参数
if item["method"] not in ["sensitivity", "kmeans"]:
raise HTTPException(
status_code=400, detail="Invalid method. Must be 'sensitivity' or 'kmeans'"
)
try:
if item["method"] == "sensitivity":
pressure_sensor_placement_sensitivity(
name=item["network"],
scheme_name=item["scheme_name"],
sensor_number=item["sensor_count"],
min_diameter=item["min_diameter"],
username=item["username"],
)
elif item["method"] == "kmeans":
pressure_sensor_placement_kmeans(
name=item["network"],
scheme_name=item["scheme_name"],
sensor_number=item["sensor_count"],
min_diameter=item["min_diameter"],
username=item["username"],
)
return {"status": "success", "method": item["method"]}
except Exception as e:
raise HTTPException(status_code=500, detail=f"执行失败: {str(e)}")
class Item(BaseModel):
str_info: str
dict_info: Optional[dict] = None