from __future__ import annotations from typing import Annotated import typer from .apps import ( data_scada_app, data_scheme_app, data_timeseries_composite_app, data_timeseries_realtime_app, data_timeseries_scada_app, data_timeseries_scheme_app, ) from .common import emit_api, runtime_context from .core import CLIError, parse_time_with_timezone, require_network, resolve_scheme from .option_types import ( CompositeKind, ElementType, JUNCTION_TIMESERIES_FIELDS, SCADA_TIMESERIES_FIELDS, ScadaListKind, SimulationQuery, timeseries_fields_for_element_type, ) def _scheme_type_option(scheme_type: str | None) -> str: return scheme_type or "simulation" def _validate_element_property(element_type: ElementType, property_name: str, *, option_name: str) -> str: valid_fields = timeseries_fields_for_element_type(element_type) if property_name not in valid_fields: raise CLIError( "CLI 参数错误", code="INVALID_PROPERTY", message=f"{option_name} for --type {element_type.value} must be one of: {', '.join(valid_fields)}", exit_code=2, ) return property_name def _validate_node_field(field_name: str, *, option_name: str) -> str: if field_name not in JUNCTION_TIMESERIES_FIELDS: raise CLIError( "CLI 参数错误", code="INVALID_FIELD", message=f"{option_name} must be one of: {', '.join(JUNCTION_TIMESERIES_FIELDS)}", exit_code=2, ) return field_name def _validate_scada_field(field_name: str, *, option_name: str) -> str: if field_name not in SCADA_TIMESERIES_FIELDS: raise CLIError( "CLI 参数错误", code="INVALID_FIELD", message=f"{option_name} must be one of: {', '.join(SCADA_TIMESERIES_FIELDS)}", exit_code=2, ) return field_name @data_timeseries_realtime_app.command("links") def data_realtime_links( ctx: typer.Context, start_time: Annotated[str, typer.Option("--start-time", help="开始时间")], end_time: Annotated[str, typer.Option("--end-time", help="结束时间")], ) -> None: emit_api( ctx, summary="读取实时管道数据成功", method="GET", path="/realtime/links", params={ "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "end_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), }, require_auth=True, require_project=True, ) @data_timeseries_realtime_app.command("nodes") def data_realtime_nodes( ctx: typer.Context, start_time: Annotated[str, typer.Option("--start-time", help="开始时间")], end_time: Annotated[str, typer.Option("--end-time", help="结束时间")], ) -> None: emit_api( ctx, summary="读取实时节点数据成功", method="GET", path="/realtime/nodes", params={ "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "end_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), }, require_auth=True, require_project=True, ) @data_timeseries_realtime_app.command("simulation-by-id-time") def data_realtime_simulation_by_id_time( ctx: typer.Context, id: Annotated[str, typer.Option("--id", help="元素 ID")], type: Annotated[ElementType, typer.Option("--type", help="元素类型,仅支持 pipe|junction;links/nodes 是子命令")], time: Annotated[str, typer.Option("--time", help="查询时间")], ) -> None: emit_api( ctx, summary="读取实时模拟数据成功", method="GET", path="/realtime/query/by-id-time", params={ "id": id, "type": type.value, "query_time": parse_time_with_timezone(time, option_name="--time").isoformat(), }, require_auth=True, require_project=True, ) @data_timeseries_realtime_app.command("simulation-by-time-property") def data_realtime_simulation_by_time_property( ctx: typer.Context, type: Annotated[ElementType, typer.Option("--type", help="元素类型,仅支持 pipe|junction;links/nodes 是子命令")], time: Annotated[str, typer.Option("--time", help="查询时间")], property: Annotated[str, typer.Option("--property", help="属性名;pipe: flow|friction|headloss|quality|reaction|setting|status|velocity;junction: actual_demand|total_head|pressure|quality")], ) -> None: property = _validate_element_property(type, property, option_name="--property") emit_api( ctx, summary="读取实时属性聚合数据成功", method="GET", path="/realtime/query/by-time-property", params={ "type": type.value, "query_time": parse_time_with_timezone(time, option_name="--time").isoformat(), "property": property, }, require_auth=True, require_project=True, ) @data_timeseries_scheme_app.command("links") def data_scheme_links( ctx: typer.Context, start_time: Annotated[str, typer.Option("--start-time", help="开始时间")], end_time: Annotated[str, typer.Option("--end-time", help="结束时间")], scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None, scheme_type: Annotated[str | None, typer.Option("--scheme-type", help="方案类型")] = None, ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取方案管道数据成功", method="GET", path="/scheme/links", params={ "scheme_name": resolve_scheme(runtime, scheme, required=True), "scheme_type": _scheme_type_option(scheme_type), "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "end_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), }, require_auth=True, require_project=True, ) @data_timeseries_scheme_app.command("node-field") def data_scheme_node_field( ctx: typer.Context, node: Annotated[str, typer.Option("--node", help="节点 ID")], field: Annotated[str, typer.Option("--field", help="字段名,仅支持 actual_demand|total_head|pressure|quality")], start_time: Annotated[str, typer.Option("--start-time", help="开始时间")], end_time: Annotated[str, typer.Option("--end-time", help="结束时间")], scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None, scheme_type: Annotated[str | None, typer.Option("--scheme-type", help="方案类型")] = None, ) -> None: runtime = runtime_context(ctx) field = _validate_node_field(field, option_name="--field") emit_api( ctx, summary="读取方案节点字段成功", method="GET", path=f"/scheme/nodes/{node}/field", params={ "field": field, "scheme_name": resolve_scheme(runtime, scheme, required=True), "scheme_type": _scheme_type_option(scheme_type), "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "end_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), }, require_auth=True, require_project=True, ) @data_timeseries_scheme_app.command("simulation") def data_scheme_simulation( ctx: typer.Context, query: Annotated[SimulationQuery, typer.Option("--query", help="查询模式,仅支持 by-id-time|by-scheme-time-property")], scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None, scheme_type: Annotated[str | None, typer.Option("--scheme-type", help="方案类型")] = None, id: Annotated[str | None, typer.Option("--id", help="元素 ID")] = None, time: Annotated[str, typer.Option("--time", help="查询时间")] = "", type: Annotated[ElementType, typer.Option("--type", help="元素类型,仅支持 pipe|junction;links/nodes 是子命令")] = ElementType.PIPE, property: Annotated[str | None, typer.Option("--property", help="属性名;pipe: flow|friction|headloss|quality|reaction|setting|status|velocity;junction: actual_demand|total_head|pressure|quality")] = None, ) -> None: runtime = runtime_context(ctx) params = { "scheme_name": resolve_scheme(runtime, scheme, required=True), "scheme_type": _scheme_type_option(scheme_type), "query_time": parse_time_with_timezone(time, option_name="--time").isoformat(), "type": type.value, } if query == SimulationQuery.BY_ID_TIME: if not id: raise CLIError( "CLI 参数错误", code="ID_REQUIRED", message="--id is required for --query by-id-time", exit_code=2, ) params["id"] = id emit_api( ctx, summary="读取方案单点模拟数据成功", method="GET", path="/scheme/query/by-id-time", params=params, require_auth=True, require_project=True, ) return if query == SimulationQuery.BY_SCHEME_TIME_PROPERTY: if not property: raise CLIError( "CLI 参数错误", code="PROPERTY_REQUIRED", message="--property is required for --query by-scheme-time-property", exit_code=2, ) property = _validate_element_property(type, property, option_name="--property") params["property"] = property emit_api( ctx, summary="读取方案属性聚合数据成功", method="GET", path="/scheme/query/by-scheme-time-property", params=params, require_auth=True, require_project=True, ) return raise AssertionError(f"unreachable query variant: {query}") @data_timeseries_scada_app.command("query") def data_scada_query( ctx: typer.Context, device_id: Annotated[list[str], typer.Option("--device-id", help="设备 ID,可重复")], start_time: Annotated[str, typer.Option("--start-time", help="开始时间")], end_time: Annotated[str, typer.Option("--end-time", help="结束时间")], field: Annotated[str | None, typer.Option("--field", help="字段名,仅支持 monitored_value|cleaned_value")] = None, ) -> None: path = "/scada/by-ids-field-time-range" if field else "/scada/by-ids-time-range" params = { "device_ids": ",".join(device_id), "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "end_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), } if field: field = _validate_scada_field(field, option_name="--field") params["field"] = field emit_api( ctx, summary="读取 SCADA 时序成功", method="GET", path=path, params=params, require_auth=True, require_project=True, ) @data_timeseries_composite_app.callback(invoke_without_command=True) def data_timeseries_composite( ctx: typer.Context, kind: Annotated[CompositeKind | None, typer.Option("--kind", help="复合查询类型,仅支持 scada-simulation|element-simulation|element-scada")] = None, feature: Annotated[list[str] | None, typer.Option("--feature", help="特征值,可重复")] = None, start_time: Annotated[str | None, typer.Option("--start-time", help="开始时间")] = None, end_time: Annotated[str | None, typer.Option("--end-time", help="结束时间")] = None, pipe: Annotated[str | None, typer.Option("--pipe", help="pipeline-health 用管道 ID")] = None, scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None, scheme_type: Annotated[str | None, typer.Option("--scheme-type", help="方案类型")] = None, use_cleaned: Annotated[bool, typer.Option("--use-cleaned", help="element-scada 使用清洗值")] = False, ) -> None: _ = pipe if ctx.invoked_subcommand is not None: return if not kind or not start_time or not end_time: raise CLIError( "CLI 参数错误", code="INVALID_COMPOSITE_ARGS", message="composite query requires --kind, --start-time, and --end-time", exit_code=2, ) runtime = runtime_context(ctx) params = { "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "end_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), } if kind == CompositeKind.SCADA_SIMULATION: if not feature: raise CLIError( "CLI 参数错误", code="FEATURE_REQUIRED", message="--feature is required for scada-simulation", exit_code=2, ) params["device_ids"] = ",".join(feature) scheme_name = resolve_scheme(runtime, scheme) if scheme_name: params["scheme_name"] = scheme_name params["scheme_type"] = _scheme_type_option(scheme_type) emit_api( ctx, summary="读取复合 SCADA-模拟数据成功", method="GET", path="/composite/scada-simulation", params=params, require_auth=True, require_project=True, ) return if kind == CompositeKind.ELEMENT_SIMULATION: if not feature: raise CLIError( "CLI 参数错误", code="FEATURE_REQUIRED", message="--feature is required for element-simulation", exit_code=2, ) params["feature_infos"] = ",".join(feature) scheme_name = resolve_scheme(runtime, scheme) if scheme_name: params["scheme_name"] = scheme_name params["scheme_type"] = _scheme_type_option(scheme_type) emit_api( ctx, summary="读取复合元素模拟数据成功", method="GET", path="/composite/element-simulation", params=params, require_auth=True, require_project=True, ) return if kind == CompositeKind.ELEMENT_SCADA: if not feature or len(feature) != 1: raise CLIError( "CLI 参数错误", code="FEATURE_REQUIRED", message="element-scada requires exactly one --feature as element_id", exit_code=2, ) params["element_id"] = feature[0] params["use_cleaned"] = use_cleaned emit_api( ctx, summary="读取元素关联 SCADA 数据成功", method="GET", path="/composite/element-scada", params=params, require_auth=True, require_project=True, ) return raise AssertionError(f"unreachable composite kind: {kind}") @data_timeseries_composite_app.command("pipeline-health") def data_composite_pipeline_health( ctx: typer.Context, pipe: Annotated[str, typer.Option("--pipe", help="管道 ID")], start_time: Annotated[str, typer.Option("--start-time", help="开始时间")], end_time: Annotated[str, typer.Option("--end-time", help="结束时间")], ) -> None: _ = pipe, start_time emit_api( ctx, summary="读取管道健康预测成功", method="GET", path="/composite/pipeline-health-prediction", params={ "network_name": require_network(runtime_context(ctx)), "query_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), }, require_auth=True, require_project=True, require_network_ctx=True, ) def _scada_mapping(kind: str, action: str) -> tuple[str, dict[str, str]]: mapping = { ("info", "get"): ("/getscadainfo/", {"id_param": "id"}), ("info", "list"): ("/getallscadainfo/", {}), } result = mapping.get((kind, action)) if result is None: raise CLIError( "CLI 参数错误", code="INVALID_SCADA_KIND", message=f"unsupported scada {action} kind: {kind}", exit_code=2, ) return result @data_scada_app.command("get") def data_scada_get( ctx: typer.Context, kind: Annotated[ScadaListKind, typer.Option("--kind", help="SCADA 类型,仅支持 info")], id: Annotated[str, typer.Option("--id", help="记录 ID")], ) -> None: runtime = runtime_context(ctx) path, meta = _scada_mapping(kind.value, "get") params = {"network": require_network(runtime), meta["id_param"]: id} emit_api( ctx, summary="读取 SCADA 数据成功", method="GET", path=path, params=params, require_auth=True, require_network_ctx=True, ) @data_scada_app.command("list") def data_scada_list( ctx: typer.Context, kind: Annotated[ScadaListKind, typer.Option("--kind", help="SCADA 类型,仅支持 info")], ) -> None: runtime = runtime_context(ctx) path, _ = _scada_mapping(kind.value, "list") emit_api( ctx, summary="读取 SCADA 列表成功", method="GET", path=path, params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @data_scheme_app.command("schema") def data_scheme_schema(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取方案 schema 成功", method="GET", path="/getschemeschema/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @data_scheme_app.command("get") def data_scheme_get( ctx: typer.Context, name: Annotated[str, typer.Option("--name", help="方案名称")], ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取方案成功", method="GET", path="/getscheme/", params={"network": require_network(runtime), "schema_name": name}, require_auth=True, require_network_ctx=True, ) @data_scheme_app.command("list") def data_scheme_list(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取方案列表成功", method="GET", path="/getallschemes/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, )