fix(cli): constrain timeseries option values

This commit is contained in:
2026-06-05 13:43:32 +08:00
parent b7872f29a9
commit 9a7aad2d36
6 changed files with 280 additions and 64 deletions
+73 -35
View File
@@ -16,12 +16,56 @@ from .apps import (
)
from .common import emit_api, runtime_context
from .core import CLIError, parse_time_with_timezone, require_network, resolve_scheme
from .option_types import (
CompositeKind,
ElementType,
JUNCTION_TIMESERIES_FIELDS,
SCADA_TIMESERIES_FIELDS,
ScadaListKind,
ScadaSchemaKind,
SimulationQuery,
timeseries_fields_for_element_type,
)
def _scheme_type_option(scheme_type: str | None) -> str:
return scheme_type or "simulation"
def _validate_element_property(element_type: ElementType, property_name: str, *, option_name: str) -> str:
valid_fields = timeseries_fields_for_element_type(element_type)
if property_name not in valid_fields:
raise CLIError(
"CLI 参数错误",
code="INVALID_PROPERTY",
message=f"{option_name} for --type {element_type.value} must be one of: {', '.join(valid_fields)}",
exit_code=2,
)
return property_name
def _validate_node_field(field_name: str, *, option_name: str) -> str:
if field_name not in JUNCTION_TIMESERIES_FIELDS:
raise CLIError(
"CLI 参数错误",
code="INVALID_FIELD",
message=f"{option_name} must be one of: {', '.join(JUNCTION_TIMESERIES_FIELDS)}",
exit_code=2,
)
return field_name
def _validate_scada_field(field_name: str, *, option_name: str) -> str:
if field_name not in SCADA_TIMESERIES_FIELDS:
raise CLIError(
"CLI 参数错误",
code="INVALID_FIELD",
message=f"{option_name} must be one of: {', '.join(SCADA_TIMESERIES_FIELDS)}",
exit_code=2,
)
return field_name
@data_timeseries_realtime_app.command("links")
def data_realtime_links(
ctx: typer.Context,
@@ -66,7 +110,7 @@ def data_realtime_nodes(
def data_realtime_simulation_by_id_time(
ctx: typer.Context,
id: Annotated[str, typer.Option("--id", help="元素 ID")],
type: Annotated[str, typer.Option("--type", help="pipe|junction")],
type: Annotated[ElementType, typer.Option("--type", help="元素类型,仅支持 pipe|junctionlinks/nodes 是子命令")],
time: Annotated[str, typer.Option("--time", help="查询时间")],
) -> None:
emit_api(
@@ -76,7 +120,7 @@ def data_realtime_simulation_by_id_time(
path="/realtime/query/by-id-time",
params={
"id": id,
"type": type,
"type": type.value,
"query_time": parse_time_with_timezone(time, option_name="--time").isoformat(),
},
require_auth=True,
@@ -87,17 +131,18 @@ def data_realtime_simulation_by_id_time(
@data_timeseries_realtime_app.command("simulation-by-time-property")
def data_realtime_simulation_by_time_property(
ctx: typer.Context,
type: Annotated[str, typer.Option("--type", help="pipe|junction")],
type: Annotated[ElementType, typer.Option("--type", help="元素类型,仅支持 pipe|junctionlinks/nodes 是子命令")],
time: Annotated[str, typer.Option("--time", help="查询时间")],
property: Annotated[str, typer.Option("--property", help="属性名")],
property: Annotated[str, typer.Option("--property", help="属性名pipe: flow|friction|headloss|quality|reaction|setting|status|velocityjunction: actual_demand|total_head|pressure|quality")],
) -> None:
property = _validate_element_property(type, property, option_name="--property")
emit_api(
ctx,
summary="读取实时属性聚合数据成功",
method="GET",
path="/realtime/query/by-time-property",
params={
"type": type,
"type": type.value,
"query_time": parse_time_with_timezone(time, option_name="--time").isoformat(),
"property": property,
},
@@ -135,13 +180,14 @@ def data_scheme_links(
def data_scheme_node_field(
ctx: typer.Context,
node: Annotated[str, typer.Option("--node", help="节点 ID")],
field: Annotated[str, typer.Option("--field", help="字段名")],
field: Annotated[str, typer.Option("--field", help="字段名,仅支持 actual_demand|total_head|pressure|quality")],
start_time: Annotated[str, typer.Option("--start-time", help="开始时间")],
end_time: Annotated[str, typer.Option("--end-time", help="结束时间")],
scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None,
scheme_type: Annotated[str | None, typer.Option("--scheme-type", help="方案类型")] = None,
) -> None:
runtime = runtime_context(ctx)
field = _validate_node_field(field, option_name="--field")
emit_api(
ctx,
summary="读取方案节点字段成功",
@@ -162,22 +208,22 @@ def data_scheme_node_field(
@data_timeseries_scheme_app.command("simulation")
def data_scheme_simulation(
ctx: typer.Context,
query: Annotated[str, typer.Option("--query", help="by-id-time|by-scheme-time-property")],
query: Annotated[SimulationQuery, typer.Option("--query", help="查询模式,仅支持 by-id-time|by-scheme-time-property")],
scheme: Annotated[str | None, typer.Option("--scheme", help="方案名称")] = None,
scheme_type: Annotated[str | None, typer.Option("--scheme-type", help="方案类型")] = None,
id: Annotated[str | None, typer.Option("--id", help="元素 ID")] = None,
time: Annotated[str, typer.Option("--time", help="查询时间")] = "",
type: Annotated[str, typer.Option("--type", help="pipe|junction")] = "pipe",
property: Annotated[str | None, typer.Option("--property", help="属性名")] = None,
type: Annotated[ElementType, typer.Option("--type", help="元素类型,仅支持 pipe|junctionlinks/nodes 是子命令")] = ElementType.PIPE,
property: Annotated[str | None, typer.Option("--property", help="属性名pipe: flow|friction|headloss|quality|reaction|setting|status|velocityjunction: actual_demand|total_head|pressure|quality")] = None,
) -> None:
runtime = runtime_context(ctx)
params = {
"scheme_name": resolve_scheme(runtime, scheme, required=True),
"scheme_type": _scheme_type_option(scheme_type),
"query_time": parse_time_with_timezone(time, option_name="--time").isoformat(),
"type": type,
"type": type.value,
}
if query == "by-id-time":
if query == SimulationQuery.BY_ID_TIME:
if not id:
raise CLIError(
"CLI 参数错误",
@@ -196,7 +242,7 @@ def data_scheme_simulation(
require_project=True,
)
return
if query == "by-scheme-time-property":
if query == SimulationQuery.BY_SCHEME_TIME_PROPERTY:
if not property:
raise CLIError(
"CLI 参数错误",
@@ -204,6 +250,7 @@ def data_scheme_simulation(
message="--property is required for --query by-scheme-time-property",
exit_code=2,
)
property = _validate_element_property(type, property, option_name="--property")
params["property"] = property
emit_api(
ctx,
@@ -215,12 +262,7 @@ def data_scheme_simulation(
require_project=True,
)
return
raise CLIError(
"CLI 参数错误",
code="INVALID_QUERY",
message="--query must be by-id-time or by-scheme-time-property",
exit_code=2,
)
raise AssertionError(f"unreachable query variant: {query}")
@data_timeseries_scada_app.command("query")
@@ -229,7 +271,7 @@ def data_scada_query(
device_id: Annotated[list[str], typer.Option("--device-id", help="设备 ID,可重复")],
start_time: Annotated[str, typer.Option("--start-time", help="开始时间")],
end_time: Annotated[str, typer.Option("--end-time", help="结束时间")],
field: Annotated[str | None, typer.Option("--field", help="字段名")] = None,
field: Annotated[str | None, typer.Option("--field", help="字段名,仅支持 monitored_value|cleaned_value")] = None,
) -> None:
path = "/scada/by-ids-field-time-range" if field else "/scada/by-ids-time-range"
params = {
@@ -238,6 +280,7 @@ def data_scada_query(
"end_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(),
}
if field:
field = _validate_scada_field(field, option_name="--field")
params["field"] = field
emit_api(
ctx,
@@ -253,7 +296,7 @@ def data_scada_query(
@data_timeseries_composite_app.callback(invoke_without_command=True)
def data_timeseries_composite(
ctx: typer.Context,
kind: Annotated[str | None, typer.Option("--kind", help="scada-simulation|element-simulation|element-scada")] = None,
kind: Annotated[CompositeKind | None, typer.Option("--kind", help="复合查询类型,仅支持 scada-simulation|element-simulation|element-scada")] = None,
feature: Annotated[list[str] | None, typer.Option("--feature", help="特征值,可重复")] = None,
start_time: Annotated[str | None, typer.Option("--start-time", help="开始时间")] = None,
end_time: Annotated[str | None, typer.Option("--end-time", help="结束时间")] = None,
@@ -277,7 +320,7 @@ def data_timeseries_composite(
"start_time": parse_time_with_timezone(start_time, option_name="--start-time").isoformat(),
"end_time": parse_time_with_timezone(end_time, option_name="--end-time").isoformat(),
}
if kind == "scada-simulation":
if kind == CompositeKind.SCADA_SIMULATION:
if not feature:
raise CLIError(
"CLI 参数错误",
@@ -300,7 +343,7 @@ def data_timeseries_composite(
require_project=True,
)
return
if kind == "element-simulation":
if kind == CompositeKind.ELEMENT_SIMULATION:
if not feature:
raise CLIError(
"CLI 参数错误",
@@ -323,7 +366,7 @@ def data_timeseries_composite(
require_project=True,
)
return
if kind == "element-scada":
if kind == CompositeKind.ELEMENT_SCADA:
if not feature or len(feature) != 1:
raise CLIError(
"CLI 参数错误",
@@ -343,12 +386,7 @@ def data_timeseries_composite(
require_project=True,
)
return
raise CLIError(
"CLI 参数错误",
code="INVALID_KIND",
message="--kind must be scada-simulation, element-simulation, or element-scada",
exit_code=2,
)
raise AssertionError(f"unreachable composite kind: {kind}")
@data_timeseries_composite_app.command("pipeline-health")
@@ -402,10 +440,10 @@ def _scada_mapping(kind: str, action: str) -> tuple[str, dict[str, str]]:
@data_scada_app.command("schema")
def data_scada_schema(
ctx: typer.Context,
kind: Annotated[str, typer.Option("--kind", help="device|device-data|element|info")],
kind: Annotated[ScadaSchemaKind, typer.Option("--kind", help="SCADA 类型,仅支持 device|device-data|element|info")],
) -> None:
runtime = runtime_context(ctx)
path, _ = _scada_mapping(kind, "schema")
path, _ = _scada_mapping(kind.value, "schema")
emit_api(
ctx,
summary="读取 SCADA schema 成功",
@@ -420,11 +458,11 @@ def data_scada_schema(
@data_scada_app.command("get")
def data_scada_get(
ctx: typer.Context,
kind: Annotated[str, typer.Option("--kind", help="device|device-data|element|info")],
kind: Annotated[ScadaSchemaKind, typer.Option("--kind", help="SCADA 类型,仅支持 device|device-data|element|info")],
id: Annotated[str, typer.Option("--id", help="记录 ID")],
) -> None:
runtime = runtime_context(ctx)
path, meta = _scada_mapping(kind, "get")
path, meta = _scada_mapping(kind.value, "get")
params = {"network": require_network(runtime), meta["id_param"]: id}
emit_api(
ctx,
@@ -440,10 +478,10 @@ def data_scada_get(
@data_scada_app.command("list")
def data_scada_list(
ctx: typer.Context,
kind: Annotated[str, typer.Option("--kind", help="device|element|info")],
kind: Annotated[ScadaListKind, typer.Option("--kind", help="SCADA 类型,仅支持 device|element|infodevice-data 无 list 接口")],
) -> None:
runtime = runtime_context(ctx)
path, _ = _scada_mapping(kind, "list")
path, _ = _scada_mapping(kind.value, "list")
emit_api(
ctx,
summary="读取 SCADA 列表成功",