Files
TJWaterServerBinary/cli/tjwater_cli/commands_analysis.py
T

525 lines
19 KiB
Python

from __future__ import annotations
from datetime import timedelta
from pathlib import Path
from typing import Annotated
import typer
from .apps import (
analysis_app,
analysis_burst_detection_app,
analysis_burst_detection_schemes_app,
analysis_burst_location_app,
analysis_burst_location_schemes_app,
analysis_leakage_app,
analysis_leakage_schemes_app,
analysis_risk_app,
analysis_sensor_placement_app,
simulation_app,
)
from .common import emit_api, runtime_context
from .core import (
CLIError,
emit_success,
parse_burst_file,
parse_optional_dataset_file,
parse_time_with_timezone,
parse_valve_setting_file,
request_json,
require_network,
require_username,
resolve_scheme,
)
from .option_types import DataSource, ValveMode
@simulation_app.command("run")
def simulation_run(
ctx: typer.Context,
start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")],
duration: Annotated[int, typer.Option("--duration", help="持续分钟数")],
) -> None:
runtime = runtime_context(ctx)
network = require_network(runtime)
parsed = parse_time_with_timezone(start_time, option_name="--start-time")
end_time = (parsed + timedelta(minutes=duration)).isoformat()
body = {
"name": network,
"start_time": parsed.replace(microsecond=0).isoformat(),
"duration": duration,
}
emit_api(
ctx,
summary="触发模拟成功",
method="POST",
path="/runsimulationmanuallybydate/",
json_body=body,
require_auth=True,
require_network_ctx=True,
next_commands=[
f"tjwater-cli data timeseries realtime links --start-time {parsed.isoformat()} --end-time {end_time}",
f"tjwater-cli data timeseries realtime nodes --start-time {parsed.isoformat()} --end-time {end_time}",
],
)
@analysis_app.command("burst")
def analysis_burst(
ctx: typer.Context,
start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")],
duration: Annotated[int, typer.Option("--duration", help="持续秒数")],
burst_file: Annotated[Path, typer.Option("--burst-file", help="爆管输入 JSON 文件")],
scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None,
) -> None:
runtime = runtime_context(ctx)
ids, sizes = parse_burst_file(burst_file)
scheme_name = resolve_scheme(runtime, scheme, required=True)
params = {
"network": require_network(runtime),
"modify_pattern_start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(),
"burst_ID": ids,
"burst_size": sizes,
"modify_total_duration": duration,
"scheme_name": scheme_name,
}
emit_api(
ctx,
summary="爆管分析执行成功",
method="GET",
path="/burst_analysis/",
params=params,
require_auth=True,
require_network_ctx=True,
next_commands=[
f"tjwater-cli data scheme get --name {scheme_name}",
"tjwater-cli data scheme list",
],
)
@analysis_app.command("valve")
def analysis_valve(
ctx: typer.Context,
mode: Annotated[ValveMode, typer.Option("--mode", help="分析模式,仅支持 close|isolation")],
start_time: Annotated[str | None, typer.Option("--start-time", help="close 模式需要")] = None,
valve: Annotated[list[str] | None, typer.Option("--valve", help="阀门 ID,可重复")] = None,
element: Annotated[list[str] | None, typer.Option("--element", help="isolation 模式的事故元素,可重复")] = None,
disabled_valve: Annotated[list[str] | None, typer.Option("--disabled-valve", help="故障阀门,可重复")] = None,
duration: Annotated[int | None, typer.Option("--duration", help="close 模式持续秒数")] = None,
scheme: Annotated[str | None, typer.Option("--scheme", help="close 模式的方案名称")] = None,
) -> None:
runtime = runtime_context(ctx)
network = require_network(runtime)
if mode == ValveMode.CLOSE:
if not start_time or not valve:
raise CLIError(
"CLI 参数错误",
code="INVALID_VALVE_CLOSE_ARGS",
message="close mode requires --start-time and at least one --valve",
exit_code=2,
)
params = {
"network": network,
"start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(),
"valves": valve,
"duration": duration or 900,
"scheme_name": resolve_scheme(runtime, scheme, required=True),
}
emit_api(
ctx,
summary="阀门关闭分析执行成功",
method="GET",
path="/valve_close_analysis/",
params=params,
require_auth=True,
require_network_ctx=True,
)
return
if mode == ValveMode.ISOLATION:
if not element:
raise CLIError(
"CLI 参数错误",
code="INVALID_VALVE_ISOLATION_ARGS",
message="isolation mode requires at least one --element",
exit_code=2,
)
params = {"network": network, "accident_element": element}
if disabled_valve:
params["disabled_valves"] = disabled_valve
emit_api(
ctx,
summary="阀门隔离分析执行成功",
method="GET",
path="/valve_isolation_analysis/",
params=params,
require_auth=True,
require_network_ctx=True,
)
return
raise AssertionError(f"unreachable valve mode: {mode}")
@analysis_app.command("flushing")
def analysis_flushing(
ctx: typer.Context,
start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")],
valve_setting_file: Annotated[Path, typer.Option("--valve-setting-file", help="阀门开度 JSON 文件")],
drainage_node: Annotated[str, typer.Option("--drainage-node", help="排污节点")],
flow: Annotated[float, typer.Option("--flow", help="冲洗流量")],
duration: Annotated[int | None, typer.Option("--duration", help="持续秒数")] = None,
scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None,
) -> None:
runtime = runtime_context(ctx)
valves, openings = parse_valve_setting_file(valve_setting_file)
params = {
"network": require_network(runtime),
"start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(),
"valves": valves,
"valves_k": openings,
"drainage_node_ID": drainage_node,
"flush_flow": flow,
"duration": duration or 900,
"scheme_name": resolve_scheme(runtime, scheme, required=True),
}
emit_api(
ctx,
summary="冲洗分析执行成功",
method="GET",
path="/flushing_analysis/",
params=params,
require_auth=True,
require_network_ctx=True,
)
@analysis_app.command("age")
def analysis_age(
ctx: typer.Context,
start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")],
duration: Annotated[int, typer.Option("--duration", help="持续秒数")],
) -> None:
runtime = runtime_context(ctx)
emit_api(
ctx,
summary="水龄分析执行成功",
method="GET",
path="/age_analysis/",
params={
"network": require_network(runtime),
"start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(),
"duration": duration,
},
require_auth=True,
require_network_ctx=True,
)
@analysis_app.command("contaminant")
def analysis_contaminant(
ctx: typer.Context,
start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")],
duration: Annotated[int, typer.Option("--duration", help="持续秒数")],
source_node: Annotated[str, typer.Option("--source-node", help="污染源节点")],
concentration: Annotated[float, typer.Option("--concentration", help="浓度")],
pattern: Annotated[str | None, typer.Option("--pattern", help="模式 ID")] = None,
scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None,
) -> None:
runtime = runtime_context(ctx)
params = {
"network": require_network(runtime),
"start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(),
"source": source_node,
"concentration": concentration,
"duration": duration,
"scheme_name": resolve_scheme(runtime, scheme, required=True),
}
if pattern:
params["pattern"] = pattern
emit_api(
ctx,
summary="污染物模拟执行成功",
method="GET",
path="/contaminant_simulation/",
params=params,
require_auth=True,
require_network_ctx=True,
)
@analysis_sensor_placement_app.command("kmeans")
def analysis_sensor_placement_kmeans(
ctx: typer.Context,
count: Annotated[int, typer.Option("--count", help="传感器数量")],
min_diameter: Annotated[int, typer.Option("--min-diameter", help="最小管径")] = 0,
scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None,
) -> None:
runtime = runtime_context(ctx)
body = {
"name": require_network(runtime),
"scheme_name": resolve_scheme(runtime, scheme, required=True),
"sensor_number": count,
"min_diameter": min_diameter,
"username": require_username(runtime),
}
emit_api(
ctx,
summary="传感器选址执行成功",
method="POST",
path="/pressure_sensor_placement_kmeans/",
json_body=body,
require_auth=True,
require_network_ctx=True,
require_username_ctx=True,
)
@analysis_leakage_app.command("identify")
def analysis_leakage_identify(
ctx: typer.Context,
start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")],
end_time: Annotated[str, typer.Option("--end-time", help="RFC3339 结束时间")],
scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None,
) -> None:
runtime = runtime_context(ctx)
body = {
"network": require_network(runtime),
"scada_start": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(),
"scada_end": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(),
"scheme_name": resolve_scheme(runtime, scheme, required=True),
}
emit_api(
ctx,
summary="漏损识别执行成功",
method="POST",
path="/leakage/identify/",
json_body=body,
require_auth=True,
require_network_ctx=True,
)
@analysis_leakage_schemes_app.command("list")
def analysis_leakage_schemes_list(ctx: typer.Context) -> None:
runtime = runtime_context(ctx)
emit_api(
ctx,
summary="读取漏损方案列表成功",
method="GET",
path="/leakage/schemes/",
params={"network": require_network(runtime)},
require_auth=True,
require_network_ctx=True,
)
@analysis_leakage_schemes_app.command("get")
def analysis_leakage_schemes_get(
ctx: typer.Context,
scheme_name: Annotated[str, typer.Argument(help="方案名称")],
) -> None:
runtime = runtime_context(ctx)
emit_api(
ctx,
summary="读取漏损方案详情成功",
method="GET",
path=f"/leakage/schemes/{scheme_name}",
params={"network": require_network(runtime)},
require_auth=True,
require_network_ctx=True,
)
@analysis_burst_detection_app.command("detect")
def analysis_burst_detection_detect(
ctx: typer.Context,
start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")],
end_time: Annotated[str, typer.Option("--end-time", help="RFC3339 结束时间")],
scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None,
) -> None:
runtime = runtime_context(ctx)
body = {
"network": require_network(runtime),
"scada_start": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(),
"scada_end": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(),
"scheme_name": resolve_scheme(runtime, scheme, required=True),
}
emit_api(
ctx,
summary="爆管检测执行成功",
method="POST",
path="/burst-detection/detect/",
json_body=body,
require_auth=True,
require_network_ctx=True,
)
@analysis_burst_detection_schemes_app.command("list")
def analysis_burst_detection_schemes_list(ctx: typer.Context) -> None:
runtime = runtime_context(ctx)
emit_api(
ctx,
summary="读取爆管检测方案列表成功",
method="GET",
path="/burst-detection/schemes/",
params={"network": require_network(runtime)},
require_auth=True,
require_network_ctx=True,
)
@analysis_burst_detection_schemes_app.command("get")
def analysis_burst_detection_schemes_get(
ctx: typer.Context,
scheme_name: Annotated[str, typer.Argument(help="方案名称")],
) -> None:
runtime = runtime_context(ctx)
emit_api(
ctx,
summary="读取爆管检测方案详情成功",
method="GET",
path=f"/burst-detection/schemes/{scheme_name}",
params={"network": require_network(runtime)},
require_auth=True,
require_network_ctx=True,
)
@analysis_burst_location_app.command("locate")
def analysis_burst_location_locate(
ctx: typer.Context,
start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")],
end_time: Annotated[str, typer.Option("--end-time", help="RFC3339 结束时间")],
burst_leakage: Annotated[float, typer.Option("--burst-leakage", help="爆管漏水量")],
scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None,
data_source: Annotated[DataSource, typer.Option("--data-source", help="数据来源,仅支持 monitoring|simulation")] = DataSource.MONITORING,
pressure_scada_id: Annotated[list[str] | None, typer.Option("--pressure-scada-id", help="压力 SCADA ID,可重复")] = None,
flow_scada_id: Annotated[list[str] | None, typer.Option("--flow-scada-id", help="流量 SCADA ID,可重复")] = None,
pressure_file: Annotated[Path | None, typer.Option("--pressure-file", help="包含 burst_pressure/normal_pressure 的 JSON 文件")] = None,
flow_file: Annotated[Path | None, typer.Option("--flow-file", help="包含 burst_flow/normal_flow 的 JSON 文件")] = None,
use_scada_flow: Annotated[bool, typer.Option("--use-scada-flow", help="启用 SCADA 流量")] = False,
) -> None:
runtime = runtime_context(ctx)
pressure_payload = parse_optional_dataset_file(pressure_file, label="pressure") or {}
flow_payload = parse_optional_dataset_file(flow_file, label="flow") or {}
body = {
"network": require_network(runtime),
"scheme_name": resolve_scheme(runtime, scheme, required=True),
"data_source": data_source.value,
"scada_burst_start": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(),
"scada_burst_end": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(),
"burst_leakage": burst_leakage,
"use_scada_flow": use_scada_flow,
}
if pressure_scada_id:
body["pressure_scada_ids"] = pressure_scada_id
if flow_scada_id:
body["flow_scada_ids"] = flow_scada_id
if isinstance(pressure_payload, dict):
body.update({key: value for key, value in pressure_payload.items() if key in {"burst_pressure", "normal_pressure"}})
if isinstance(flow_payload, dict):
body.update({key: value for key, value in flow_payload.items() if key in {"burst_flow", "normal_flow"}})
emit_api(
ctx,
summary="爆管定位执行成功",
method="POST",
path="/burst-location/locate/",
json_body=body,
require_auth=True,
require_network_ctx=True,
)
@analysis_burst_location_schemes_app.command("list")
def analysis_burst_location_schemes_list(ctx: typer.Context) -> None:
runtime = runtime_context(ctx)
emit_api(
ctx,
summary="读取爆管定位方案列表成功",
method="GET",
path="/burst-location/schemes/",
params={"network": require_network(runtime)},
require_auth=True,
require_network_ctx=True,
)
@analysis_burst_location_schemes_app.command("get")
def analysis_burst_location_schemes_get(
ctx: typer.Context,
scheme_name: Annotated[str, typer.Argument(help="方案名称")],
) -> None:
runtime = runtime_context(ctx)
emit_api(
ctx,
summary="读取爆管定位方案详情成功",
method="GET",
path=f"/burst-location/schemes/{scheme_name}",
params={"network": require_network(runtime)},
require_auth=True,
require_network_ctx=True,
)
@analysis_risk_app.command("pipe-now")
def analysis_risk_pipe_now(
ctx: typer.Context,
pipe: Annotated[str, typer.Option("--pipe", help="管道 ID")],
) -> None:
runtime = runtime_context(ctx)
emit_api(
ctx,
summary="读取当前管道风险成功",
method="GET",
path="/getpiperiskprobabilitynow/",
params={"network": require_network(runtime), "pipe_id": pipe},
require_auth=True,
require_network_ctx=True,
)
@analysis_risk_app.command("pipe-history")
def analysis_risk_pipe_history(
ctx: typer.Context,
pipe: Annotated[str, typer.Option("--pipe", help="管道 ID")],
) -> None:
runtime = runtime_context(ctx)
emit_api(
ctx,
summary="读取历史管道风险成功",
method="GET",
path="/getpiperiskprobability/",
params={"network": require_network(runtime), "pipe_id": pipe},
require_auth=True,
require_network_ctx=True,
)
@analysis_risk_app.command("network")
def analysis_risk_network(ctx: typer.Context) -> None:
runtime = runtime_context(ctx)
network = require_network(runtime)
probabilities, duration_prob = request_json(
runtime,
method="GET",
path="/getnetworkpiperiskprobabilitynow/",
params={"network": network},
require_auth=True,
require_network_ctx=True,
)
geometries, duration_geo = request_json(
runtime,
method="GET",
path="/getpiperiskprobabilitygeometries/",
params={"network": network},
require_auth=True,
require_network_ctx=True,
)
emit_success(
summary="读取全网风险成功",
data={"probabilities": probabilities, "geometries": geometries},
ctx=runtime,
duration_ms=duration_prob + duration_geo,
)