from __future__ import annotations from typing import Annotated import typer from .apps import ( data_extension_app, data_misc_app, data_scada_app, data_scheme_app, data_timeseries_composite_app, data_timeseries_realtime_app, data_timeseries_scada_app, data_timeseries_scheme_app, ) from .common import emit_api, runtime_context from .core import CLIError, parse_time_with_timezone, require_network, resolve_scheme def _scheme_type_option(scheme_type: str | None) -> str: return scheme_type or "simulation" @data_timeseries_realtime_app.command("links") def data_realtime_links( ctx: typer.Context, start_time: Annotated[str, typer.Option("--start-time", help="开始时间")], end_time: Annotated[str, typer.Option("--end-time", help="结束时间")], ) -> None: emit_api( ctx, summary="读取实时管道数据成功", method="GET", path="/realtime/links", params={ "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "end_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), }, require_auth=True, require_project=True, ) @data_timeseries_realtime_app.command("nodes") def data_realtime_nodes( ctx: typer.Context, start_time: Annotated[str, typer.Option("--start-time", help="开始时间")], end_time: Annotated[str, typer.Option("--end-time", help="结束时间")], ) -> None: emit_api( ctx, summary="读取实时节点数据成功", method="GET", path="/realtime/nodes", params={ "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "end_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), }, require_auth=True, require_project=True, ) @data_timeseries_realtime_app.command("simulation-by-id-time") def data_realtime_simulation_by_id_time( ctx: typer.Context, id: Annotated[str, typer.Option("--id", help="元素 ID")], type: Annotated[str, typer.Option("--type", help="pipe|junction")], time: Annotated[str, typer.Option("--time", help="查询时间")], ) -> None: emit_api( ctx, summary="读取实时模拟数据成功", method="GET", path="/realtime/query/by-id-time", params={ "id": id, "type": type, "query_time": parse_time_with_timezone(time, option_name="--time").isoformat(), }, require_auth=True, require_project=True, ) @data_timeseries_realtime_app.command("simulation-by-time-property") def data_realtime_simulation_by_time_property( ctx: typer.Context, type: Annotated[str, typer.Option("--type", help="pipe|junction")], time: Annotated[str, typer.Option("--time", help="查询时间")], property: Annotated[str, typer.Option("--property", help="属性名")], ) -> None: emit_api( ctx, summary="读取实时属性聚合数据成功", method="GET", path="/realtime/query/by-time-property", params={ "type": type, "query_time": parse_time_with_timezone(time, option_name="--time").isoformat(), "property": property, }, require_auth=True, require_project=True, ) @data_timeseries_scheme_app.command("links") def data_scheme_links( ctx: typer.Context, start_time: Annotated[str, typer.Option("--start-time", help="开始时间")], end_time: Annotated[str, typer.Option("--end-time", help="结束时间")], scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None, scheme_type: Annotated[str | None, typer.Option("--scheme-type", help="方案类型")] = None, ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取方案管道数据成功", method="GET", path="/scheme/links", params={ "scheme_name": resolve_scheme(runtime, scheme, required=True), "scheme_type": _scheme_type_option(scheme_type), "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "end_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), }, require_auth=True, require_project=True, ) @data_timeseries_scheme_app.command("node-field") def data_scheme_node_field( ctx: typer.Context, node: Annotated[str, typer.Option("--node", help="节点 ID")], field: Annotated[str, typer.Option("--field", help="字段名")], start_time: Annotated[str, typer.Option("--start-time", help="开始时间")], end_time: Annotated[str, typer.Option("--end-time", help="结束时间")], scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None, scheme_type: Annotated[str | None, typer.Option("--scheme-type", help="方案类型")] = None, ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取方案节点字段成功", method="GET", path=f"/scheme/nodes/{node}/field", params={ "field": field, "scheme_name": resolve_scheme(runtime, scheme, required=True), "scheme_type": _scheme_type_option(scheme_type), "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "end_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), }, require_auth=True, require_project=True, ) @data_timeseries_scheme_app.command("simulation") def data_scheme_simulation( ctx: typer.Context, query: Annotated[str, typer.Option("--query", help="by-id-time|by-scheme-time-property")], scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None, scheme_type: Annotated[str | None, typer.Option("--scheme-type", help="方案类型")] = None, id: Annotated[str | None, typer.Option("--id", help="元素 ID")] = None, time: Annotated[str, typer.Option("--time", help="查询时间")] = "", type: Annotated[str, typer.Option("--type", help="pipe|junction")] = "pipe", property: Annotated[str | None, typer.Option("--property", help="属性名")] = None, ) -> None: runtime = runtime_context(ctx) params = { "scheme_name": resolve_scheme(runtime, scheme, required=True), "scheme_type": _scheme_type_option(scheme_type), "query_time": parse_time_with_timezone(time, option_name="--time").isoformat(), "type": type, } if query == "by-id-time": if not id: raise CLIError( "CLI 参数错误", code="ID_REQUIRED", message="--id is required for --query by-id-time", exit_code=2, ) params["id"] = id emit_api( ctx, summary="读取方案单点模拟数据成功", method="GET", path="/scheme/query/by-id-time", params=params, require_auth=True, require_project=True, ) return if query == "by-scheme-time-property": if not property: raise CLIError( "CLI 参数错误", code="PROPERTY_REQUIRED", message="--property is required for --query by-scheme-time-property", exit_code=2, ) params["property"] = property emit_api( ctx, summary="读取方案属性聚合数据成功", method="GET", path="/scheme/query/by-scheme-time-property", params=params, require_auth=True, require_project=True, ) return raise CLIError( "CLI 参数错误", code="INVALID_QUERY", message="--query must be by-id-time or by-scheme-time-property", exit_code=2, ) @data_timeseries_scada_app.command("query") def data_scada_query( ctx: typer.Context, device_id: Annotated[list[str], typer.Option("--device-id", help="设备 ID,可重复")], start_time: Annotated[str, typer.Option("--start-time", help="开始时间")], end_time: Annotated[str, typer.Option("--end-time", help="结束时间")], field: Annotated[str | None, typer.Option("--field", help="字段名")] = None, ) -> None: path = "/scada/by-ids-field-time-range" if field else "/scada/by-ids-time-range" params = { "device_ids": ",".join(device_id), "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "end_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), } if field: params["field"] = field emit_api( ctx, summary="读取 SCADA 时序成功", method="GET", path=path, params=params, require_auth=True, require_project=True, ) @data_timeseries_composite_app.callback(invoke_without_command=True) def data_timeseries_composite( ctx: typer.Context, kind: Annotated[str | None, typer.Option("--kind", help="scada-simulation|element-simulation|element-scada")] = None, feature: Annotated[list[str] | None, typer.Option("--feature", help="特征值,可重复")] = None, start_time: Annotated[str | None, typer.Option("--start-time", help="开始时间")] = None, end_time: Annotated[str | None, typer.Option("--end-time", help="结束时间")] = None, pipe: Annotated[str | None, typer.Option("--pipe", help="pipeline-health 用管道 ID")] = None, scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None, scheme_type: Annotated[str | None, typer.Option("--scheme-type", help="方案类型")] = None, use_cleaned: Annotated[bool, typer.Option("--use-cleaned", help="element-scada 使用清洗值")] = False, ) -> None: _ = pipe if ctx.invoked_subcommand is not None: return if not kind or not start_time or not end_time: raise CLIError( "CLI 参数错误", code="INVALID_COMPOSITE_ARGS", message="composite query requires --kind, --start-time, and --end-time", exit_code=2, ) runtime = runtime_context(ctx) params = { "start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(), "end_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), } if kind == "scada-simulation": if not feature: raise CLIError( "CLI 参数错误", code="FEATURE_REQUIRED", message="--feature is required for scada-simulation", exit_code=2, ) params["device_ids"] = ",".join(feature) scheme_name = resolve_scheme(runtime, scheme) if scheme_name: params["scheme_name"] = scheme_name params["scheme_type"] = _scheme_type_option(scheme_type) emit_api( ctx, summary="读取复合 SCADA-模拟数据成功", method="GET", path="/composite/scada-simulation", params=params, require_auth=True, require_project=True, ) return if kind == "element-simulation": if not feature: raise CLIError( "CLI 参数错误", code="FEATURE_REQUIRED", message="--feature is required for element-simulation", exit_code=2, ) params["feature_infos"] = ",".join(feature) scheme_name = resolve_scheme(runtime, scheme) if scheme_name: params["scheme_name"] = scheme_name params["scheme_type"] = _scheme_type_option(scheme_type) emit_api( ctx, summary="读取复合元素模拟数据成功", method="GET", path="/composite/element-simulation", params=params, require_auth=True, require_project=True, ) return if kind == "element-scada": if not feature or len(feature) != 1: raise CLIError( "CLI 参数错误", code="FEATURE_REQUIRED", message="element-scada requires exactly one --feature as element_id", exit_code=2, ) params["element_id"] = feature[0] params["use_cleaned"] = use_cleaned emit_api( ctx, summary="读取元素关联 SCADA 数据成功", method="GET", path="/composite/element-scada", params=params, require_auth=True, require_project=True, ) return raise CLIError( "CLI 参数错误", code="INVALID_KIND", message="--kind must be scada-simulation, element-simulation, or element-scada", exit_code=2, ) @data_timeseries_composite_app.command("pipeline-health") def data_composite_pipeline_health( ctx: typer.Context, pipe: Annotated[str, typer.Option("--pipe", help="管道 ID")], start_time: Annotated[str, typer.Option("--start-time", help="开始时间")], end_time: Annotated[str, typer.Option("--end-time", help="结束时间")], ) -> None: _ = pipe, start_time emit_api( ctx, summary="读取管道健康预测成功", method="GET", path="/composite/pipeline-health-prediction", params={ "network_name": require_network(runtime_context(ctx)), "query_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(), }, require_auth=True, require_project=True, require_network_ctx=True, ) def _scada_mapping(kind: str, action: str) -> tuple[str, dict[str, str]]: mapping = { ("device", "schema"): ("/getscadadeviceschema/", {}), ("device", "get"): ("/getscadadevice/", {"id_param": "id"}), ("device", "list"): ("/getallscadadevices/", {}), ("device-data", "schema"): ("/getscadadevicedataschema/", {}), ("device-data", "get"): ("/getscadadevicedata/", {"id_param": "device_id"}), ("element", "schema"): ("/getscadaelementschema/", {}), ("element", "get"): ("/getscadaelement/", {"id_param": "id"}), ("element", "list"): ("/getscadaelements/", {}), ("info", "schema"): ("/getscadainfoschema/", {}), ("info", "get"): ("/getscadainfo/", {"id_param": "id"}), ("info", "list"): ("/getallscadainfo/", {}), } result = mapping.get((kind, action)) if result is None: raise CLIError( "CLI 参数错误", code="INVALID_SCADA_KIND", message=f"unsupported scada {action} kind: {kind}", exit_code=2, ) return result @data_scada_app.command("schema") def data_scada_schema( ctx: typer.Context, kind: Annotated[str, typer.Option("--kind", help="device|device-data|element|info")], ) -> None: runtime = runtime_context(ctx) path, _ = _scada_mapping(kind, "schema") emit_api( ctx, summary="读取 SCADA schema 成功", method="GET", path=path, params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @data_scada_app.command("get") def data_scada_get( ctx: typer.Context, kind: Annotated[str, typer.Option("--kind", help="device|device-data|element|info")], id: Annotated[str, typer.Option("--id", help="记录 ID")], ) -> None: runtime = runtime_context(ctx) path, meta = _scada_mapping(kind, "get") params = {"network": require_network(runtime), meta["id_param"]: id} emit_api( ctx, summary="读取 SCADA 数据成功", method="GET", path=path, params=params, require_auth=True, require_network_ctx=True, ) @data_scada_app.command("list") def data_scada_list( ctx: typer.Context, kind: Annotated[str, typer.Option("--kind", help="device|element|info")], ) -> None: runtime = runtime_context(ctx) path, _ = _scada_mapping(kind, "list") emit_api( ctx, summary="读取 SCADA 列表成功", method="GET", path=path, params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @data_scheme_app.command("schema") def data_scheme_schema(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取方案 schema 成功", method="GET", path="/getschemeschema/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @data_scheme_app.command("get") def data_scheme_get( ctx: typer.Context, name: Annotated[str, typer.Option("--name", help="方案名称")], ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取方案成功", method="GET", path="/getscheme/", params={"network": require_network(runtime), "schema_name": name}, require_auth=True, require_network_ctx=True, ) @data_scheme_app.command("list") def data_scheme_list(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取方案列表成功", method="GET", path="/getallschemes/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @data_extension_app.command("keys") def data_extension_keys(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取扩展数据键成功", method="GET", path="/getallextensiondatakeys/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @data_extension_app.command("get") def data_extension_get( ctx: typer.Context, key: Annotated[str, typer.Option("--key", help="扩展键")], ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取扩展数据成功", method="GET", path="/getextensiondata/", params={"network": require_network(runtime), "key": key}, require_auth=True, require_network_ctx=True, ) @data_extension_app.command("list") def data_extension_list(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取扩展数据列表成功", method="GET", path="/getallextensiondata/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @data_misc_app.command("sensor-placements") def data_misc_sensor_placements(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取传感器位置成功", method="GET", path="/getallsensorplacements/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @data_misc_app.command("burst-location-results") def data_misc_burst_location_results(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取爆管定位结果成功", method="GET", path="/getallburstlocateresults/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, )