from __future__ import annotations from datetime import timedelta from pathlib import Path from typing import Annotated import typer from .apps import ( analysis_app, analysis_burst_detection_app, analysis_burst_detection_schemes_app, analysis_burst_location_app, analysis_burst_location_schemes_app, analysis_leakage_app, analysis_leakage_schemes_app, analysis_risk_app, analysis_sensor_placement_app, simulation_app, ) from .common import emit_api, runtime_context from .core import ( CLIError, emit_success, parse_burst_file, parse_optional_dataset_file, parse_time_with_timezone, parse_valve_setting_file, request_json, require_network, require_username, resolve_scheme, ) from .option_types import DataSource, ValveMode @simulation_app.command("run") def simulation_run( ctx: typer.Context, start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")], duration: Annotated[int, typer.Option("--duration", help="持续分钟数")], ) -> None: runtime = runtime_context(ctx) network = require_network(runtime) parsed = parse_time_with_timezone(start_time, option_name="--start-time") end_time = (parsed + timedelta(minutes=duration)).isoformat() body = { "name": network, "start_time": parsed.replace(microsecond=0).isoformat(), "duration": duration, } emit_api( ctx, summary="触发模拟成功", method="POST", path="/runsimulationmanuallybydate/", json_body=body, require_auth=True, require_network_ctx=True, next_commands=[ f"tjwater-cli data timeseries realtime links --start-time {parsed.isoformat()} --end-time {end_time}", f"tjwater-cli data timeseries realtime nodes --start-time {parsed.isoformat()} --end-time {end_time}", ], ) @analysis_app.command("burst") def analysis_burst( ctx: typer.Context, start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")], duration: Annotated[int, typer.Option("--duration", help="持续秒数")], burst_file: Annotated[Path, typer.Option("--burst-file", help="爆管输入 JSON 文件")], scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None, ) -> None: runtime = runtime_context(ctx) ids, sizes = parse_burst_file(burst_file) scheme_name = resolve_scheme(runtime, scheme, required=True) params = { "network": require_network(runtime), "modify_pattern_start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "burst_ID": ids, "burst_size": sizes, "modify_total_duration": duration, "scheme_name": scheme_name, } emit_api( ctx, summary="爆管分析执行成功", method="GET", path="/burst_analysis/", params=params, require_auth=True, require_network_ctx=True, next_commands=[ f"tjwater-cli data scheme get --name {scheme_name}", "tjwater-cli data scheme list", ], ) @analysis_app.command("valve") def analysis_valve( ctx: typer.Context, mode: Annotated[ValveMode, typer.Option("--mode", help="分析模式,仅支持 close|isolation")], start_time: Annotated[str | None, typer.Option("--start-time", help="close 模式需要")] = None, valve: Annotated[list[str] | None, typer.Option("--valve", help="阀门 ID,可重复")] = None, element: Annotated[list[str] | None, typer.Option("--element", help="isolation 模式的事故元素,可重复")] = None, disabled_valve: Annotated[list[str] | None, typer.Option("--disabled-valve", help="故障阀门,可重复")] = None, duration: Annotated[int | None, typer.Option("--duration", help="close 模式持续秒数")] = None, scheme: Annotated[str | None, typer.Option("--scheme", help="close 模式的方案名称")] = None, ) -> None: runtime = runtime_context(ctx) network = require_network(runtime) if mode == ValveMode.CLOSE: if not start_time or not valve: raise CLIError( "CLI 参数错误", code="INVALID_VALVE_CLOSE_ARGS", message="close mode requires --start-time and at least one --valve", exit_code=2, ) params = { "network": network, "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "valves": valve, "duration": duration or 900, "scheme_name": resolve_scheme(runtime, scheme, required=True), } emit_api( ctx, summary="阀门关闭分析执行成功", method="GET", path="/valve_close_analysis/", params=params, require_auth=True, require_network_ctx=True, ) return if mode == ValveMode.ISOLATION: if not element: raise CLIError( "CLI 参数错误", code="INVALID_VALVE_ISOLATION_ARGS", message="isolation mode requires at least one --element", exit_code=2, ) params = {"network": network, "accident_element": element} if disabled_valve: params["disabled_valves"] = disabled_valve emit_api( ctx, summary="阀门隔离分析执行成功", method="GET", path="/valve_isolation_analysis/", params=params, require_auth=True, require_network_ctx=True, ) return raise AssertionError(f"unreachable valve mode: {mode}") @analysis_app.command("flushing") def analysis_flushing( ctx: typer.Context, start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")], valve_setting_file: Annotated[Path, typer.Option("--valve-setting-file", help="阀门开度 JSON 文件")], drainage_node: Annotated[str, typer.Option("--drainage-node", help="排污节点")], flow: Annotated[float, typer.Option("--flow", help="冲洗流量")], duration: Annotated[int | None, typer.Option("--duration", help="持续秒数")] = None, scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None, ) -> None: runtime = runtime_context(ctx) valves, openings = parse_valve_setting_file(valve_setting_file) params = { "network": require_network(runtime), "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "valves": valves, "valves_k": openings, "drainage_node_ID": drainage_node, "flush_flow": flow, "duration": duration or 900, "scheme_name": resolve_scheme(runtime, scheme, required=True), } emit_api( ctx, summary="冲洗分析执行成功", method="GET", path="/flushing_analysis/", params=params, require_auth=True, require_network_ctx=True, ) @analysis_app.command("age") def analysis_age( ctx: typer.Context, start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")], duration: Annotated[int, typer.Option("--duration", help="持续秒数")], ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="水龄分析执行成功", method="GET", path="/age_analysis/", params={ "network": require_network(runtime), "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "duration": duration, }, require_auth=True, require_network_ctx=True, ) @analysis_app.command("contaminant") def analysis_contaminant( ctx: typer.Context, start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")], duration: Annotated[int, typer.Option("--duration", help="持续秒数")], source_node: Annotated[str, typer.Option("--source-node", help="污染源节点")], concentration: Annotated[float, typer.Option("--concentration", help="浓度")], pattern: Annotated[str | None, typer.Option("--pattern", help="模式 ID")] = None, scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None, ) -> None: runtime = runtime_context(ctx) params = { "network": require_network(runtime), "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "source": source_node, "concentration": concentration, "duration": duration, "scheme_name": resolve_scheme(runtime, scheme, required=True), } if pattern: params["pattern"] = pattern emit_api( ctx, summary="污染物模拟执行成功", method="GET", path="/contaminant_simulation/", params=params, require_auth=True, require_network_ctx=True, ) @analysis_sensor_placement_app.command("kmeans") def analysis_sensor_placement_kmeans( ctx: typer.Context, count: Annotated[int, typer.Option("--count", help="传感器数量")], min_diameter: Annotated[int, typer.Option("--min-diameter", help="最小管径")] = 0, scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None, ) -> None: runtime = runtime_context(ctx) body = { "name": require_network(runtime), "scheme_name": resolve_scheme(runtime, scheme, required=True), "sensor_number": count, "min_diameter": min_diameter, "username": require_username(runtime), } emit_api( ctx, summary="传感器选址执行成功", method="POST", path="/pressure_sensor_placement_kmeans/", json_body=body, require_auth=True, require_network_ctx=True, require_username_ctx=True, ) @analysis_leakage_app.command("identify") def analysis_leakage_identify( ctx: typer.Context, start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")], end_time: Annotated[str, typer.Option("--end-time", help="RFC3339 结束时间")], scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None, ) -> None: runtime = runtime_context(ctx) body = { "network": require_network(runtime), "scada_start": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "scada_end": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), "scheme_name": resolve_scheme(runtime, scheme, required=True), } emit_api( ctx, summary="漏损识别执行成功", method="POST", path="/leakage/identify/", json_body=body, require_auth=True, require_network_ctx=True, ) @analysis_leakage_schemes_app.command("list") def analysis_leakage_schemes_list(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取漏损方案列表成功", method="GET", path="/leakage/schemes/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @analysis_leakage_schemes_app.command("get") def analysis_leakage_schemes_get( ctx: typer.Context, scheme_name: Annotated[str, typer.Argument(help="方案名称")], ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取漏损方案详情成功", method="GET", path=f"/leakage/schemes/{scheme_name}", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @analysis_burst_detection_app.command("detect") def analysis_burst_detection_detect( ctx: typer.Context, start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")], end_time: Annotated[str, typer.Option("--end-time", help="RFC3339 结束时间")], scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None, ) -> None: runtime = runtime_context(ctx) body = { "network": require_network(runtime), "scada_start": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "scada_end": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), "scheme_name": resolve_scheme(runtime, scheme, required=True), } emit_api( ctx, summary="爆管检测执行成功", method="POST", path="/burst-detection/detect/", json_body=body, require_auth=True, require_network_ctx=True, ) @analysis_burst_detection_schemes_app.command("list") def analysis_burst_detection_schemes_list(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取爆管检测方案列表成功", method="GET", path="/burst-detection/schemes/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @analysis_burst_detection_schemes_app.command("get") def analysis_burst_detection_schemes_get( ctx: typer.Context, scheme_name: Annotated[str, typer.Argument(help="方案名称")], ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取爆管检测方案详情成功", method="GET", path=f"/burst-detection/schemes/{scheme_name}", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @analysis_burst_location_app.command("locate") def analysis_burst_location_locate( ctx: typer.Context, start_time: Annotated[str, typer.Option("--start-time", help="RFC3339 开始时间")], end_time: Annotated[str, typer.Option("--end-time", help="RFC3339 结束时间")], burst_leakage: Annotated[float, typer.Option("--burst-leakage", help="爆管漏水量")], scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None, data_source: Annotated[DataSource, typer.Option("--data-source", help="数据来源,仅支持 monitoring|simulation")] = DataSource.MONITORING, pressure_scada_id: Annotated[list[str] | None, typer.Option("--pressure-scada-id", help="压力 SCADA ID,可重复")] = None, flow_scada_id: Annotated[list[str] | None, typer.Option("--flow-scada-id", help="流量 SCADA ID,可重复")] = None, pressure_file: Annotated[Path | None, typer.Option("--pressure-file", help="包含 burst_pressure/normal_pressure 的 JSON 文件")] = None, flow_file: Annotated[Path | None, typer.Option("--flow-file", help="包含 burst_flow/normal_flow 的 JSON 文件")] = None, use_scada_flow: Annotated[bool, typer.Option("--use-scada-flow", help="启用 SCADA 流量")] = False, ) -> None: runtime = runtime_context(ctx) pressure_payload = parse_optional_dataset_file(pressure_file, label="pressure") or {} flow_payload = parse_optional_dataset_file(flow_file, label="flow") or {} body = { "network": require_network(runtime), "scheme_name": resolve_scheme(runtime, scheme, required=True), "data_source": data_source.value, "scada_burst_start": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "scada_burst_end": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), "burst_leakage": burst_leakage, "use_scada_flow": use_scada_flow, } if pressure_scada_id: body["pressure_scada_ids"] = pressure_scada_id if flow_scada_id: body["flow_scada_ids"] = flow_scada_id if isinstance(pressure_payload, dict): body.update({key: value for key, value in pressure_payload.items() if key in {"burst_pressure", "normal_pressure"}}) if isinstance(flow_payload, dict): body.update({key: value for key, value in flow_payload.items() if key in {"burst_flow", "normal_flow"}}) emit_api( ctx, summary="爆管定位执行成功", method="POST", path="/burst-location/locate/", json_body=body, require_auth=True, require_network_ctx=True, ) @analysis_burst_location_schemes_app.command("list") def analysis_burst_location_schemes_list(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取爆管定位方案列表成功", method="GET", path="/burst-location/schemes/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @analysis_burst_location_schemes_app.command("get") def analysis_burst_location_schemes_get( ctx: typer.Context, scheme_name: Annotated[str, typer.Argument(help="方案名称")], ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取爆管定位方案详情成功", method="GET", path=f"/burst-location/schemes/{scheme_name}", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @analysis_risk_app.command("pipe-now") def analysis_risk_pipe_now( ctx: typer.Context, pipe: Annotated[str, typer.Option("--pipe", help="管道 ID")], ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取当前管道风险成功", method="GET", path="/getpiperiskprobabilitynow/", params={"network": require_network(runtime), "pipe_id": pipe}, require_auth=True, require_network_ctx=True, ) @analysis_risk_app.command("pipe-history") def analysis_risk_pipe_history( ctx: typer.Context, pipe: Annotated[str, typer.Option("--pipe", help="管道 ID")], ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取历史管道风险成功", method="GET", path="/getpiperiskprobability/", params={"network": require_network(runtime), "pipe_id": pipe}, require_auth=True, require_network_ctx=True, ) @analysis_risk_app.command("network") def analysis_risk_network(ctx: typer.Context) -> None: runtime = runtime_context(ctx) network = require_network(runtime) probabilities, duration_prob = request_json( runtime, method="GET", path="/getnetworkpiperiskprobabilitynow/", params={"network": network}, require_auth=True, require_network_ctx=True, ) geometries, duration_geo = request_json( runtime, method="GET", path="/getpiperiskprobabilitygeometries/", params={"network": network}, require_auth=True, require_network_ctx=True, ) emit_success( summary="读取全网风险成功", data={"probabilities": probabilities, "geometries": geometries}, ctx=runtime, duration_ms=duration_prob + duration_geo, )