From 27941140003a46266ba5bf48ed0fa0b357e3f7d3 Mon Sep 17 00:00:00 2001 From: Jiang Date: Thu, 5 Feb 2026 10:47:38 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=9F=E4=B8=80scheme=5Fname=E5=91=BD?= =?UTF-8?q?=E5=90=8D=E8=A7=84=E5=88=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/algorithms/valve_isolation.py | 131 ++++++++++++++++++++++------- app/api/v1/endpoints/data_query.py | 4 +- app/api/v1/endpoints/extension.py | 2 +- app/api/v1/endpoints/simulation.py | 30 +++++-- app/infra/db/influxdb/api.py | 124 +++++++++++++-------------- app/services/simulation.py | 8 +- app/services/valve_isolation.py | 6 +- scripts/main.py | 14 +-- scripts/online_Analysis.py | 22 ++--- 9 files changed, 215 insertions(+), 126 deletions(-) diff --git a/app/algorithms/valve_isolation.py b/app/algorithms/valve_isolation.py index 092f4f8..eaea72d 100644 --- a/app/algorithms/valve_isolation.py +++ b/app/algorithms/valve_isolation.py @@ -1,10 +1,11 @@ from collections import defaultdict, deque +from functools import lru_cache +import time from typing import Any from app.services.tjnetwork import ( get_network_link_nodes, is_node, - is_link, get_link_properties, ) @@ -19,48 +20,102 @@ def _parse_link_entry(link_entry: str) -> tuple[str, str, str, str]: return parts[0], parts[1], parts[2], parts[3] +@lru_cache(maxsize=16) +def _get_network_topology(network: str): + """ + 解析并缓存网络拓扑,大幅减少重复的 API 调用和字符串解析开销。 + 返回: + - pipe_adj: 永久连通的管道/泵邻接表 (dict[str, set]) + - all_valves: 所有阀门字典 {id: (n1, n2)} + - link_lookup: 链路快速查表 {id: (n1, n2, type)} 用于快速定位事故点 + - node_set: 所有已知节点集合 + """ + pipe_adj = defaultdict(set) + all_valves = {} + link_lookup = {} + node_set = set() + + # 此处假设 get_network_link_nodes 获取全网数据 + for link_entry in get_network_link_nodes(network): + link_id, link_type, node1, node2 = _parse_link_entry(link_entry) + link_type_name = str(link_type).lower() + + link_lookup[link_id] = (node1, node2, link_type_name) + node_set.add(node1) + node_set.add(node2) + + if link_type_name == VALVE_LINK_TYPE: + all_valves[link_id] = (node1, node2) + else: + # 只有非阀门(管道/泵)才进入永久连通图 + pipe_adj[node1].add(node2) + pipe_adj[node2].add(node1) + + return pipe_adj, all_valves, link_lookup, node_set + + def valve_isolation_analysis( - network: str, accident_elements: str | list[str] + network: str, accident_elements: str | list[str], disabled_valves: list[str] = None ) -> dict[str, Any]: """ 关阀搜索/分析:基于拓扑结构确定事故隔离所需关阀。 :param network: 模型名称 :param accident_elements: 事故点(节点或管道/泵/阀门ID),可以是单个ID字符串或ID列表 + :param disabled_valves: 故障/无法关闭的阀门ID列表 :return: dict,包含受影响节点、必须关闭阀门、可选阀门等信息 """ + if disabled_valves is None: + disabled_valves_set = set() + else: + disabled_valves_set = set(disabled_valves) + if isinstance(accident_elements, str): target_elements = [accident_elements] else: target_elements = accident_elements + # 1. 获取缓存拓扑 (极快,无 IO) + pipe_adj, all_valves, link_lookup, node_set = _get_network_topology(network) + + # 2. 确定起点,优先查表避免 API 调用 start_nodes = set() - for element in target_elements: - if is_node(network, element): + if element in node_set: start_nodes.add(element) - elif is_link(network, element): - link_props = get_link_properties(network, element) - node1 = link_props.get("node1") - node2 = link_props.get("node2") - if not node1 or not node2: - # 如果是批量处理,可以选择跳过错误或记录错误,这里暂时保持严谨抛出异常 - raise ValueError(f"Accident link {element} missing node endpoints") - start_nodes.add(node1) - start_nodes.add(node2) + elif element in link_lookup: + n1, n2, _ = link_lookup[element] + start_nodes.add(n1) + start_nodes.add(n2) else: - raise ValueError(f"Accident element {element} not found") + # 仅当缓存中没找到时(极少见),才回退到慢速 API + if is_node(network, element): + start_nodes.add(element) + else: + props = get_link_properties(network, element) + n1, n2 = props.get("node1"), props.get("node2") + if n1 and n2: + start_nodes.add(n1) + start_nodes.add(n2) + else: + raise ValueError( + f"Accident element {element} invalid or missing endpoints" + ) - adjacency: dict[str, set[str]] = defaultdict(set) - valve_links: dict[str, tuple[str, str]] = {} - for link_entry in get_network_link_nodes(network): - link_id, link_type, node1, node2 = _parse_link_entry(link_entry) - link_type_name = str(link_type).lower() - if link_type_name == VALVE_LINK_TYPE: - valve_links[link_id] = (node1, node2) - continue - adjacency[node1].add(node2) - adjacency[node2].add(node1) + # 3. 处理故障阀门 (构建临时增量图) + # 我们不修改 cached pipe_adj,而是建立一个 extra_adj + extra_adj = defaultdict(list) + boundary_valves = {} # 当前有效的边界阀门 + for vid, (n1, n2) in all_valves.items(): + if vid in disabled_valves_set: + # 故障阀门:视为连通管道 + extra_adj[n1].append(n2) + extra_adj[n2].append(n1) + else: + # 正常阀门:视为潜在边界 + boundary_valves[vid] = (n1, n2) + + # 4. BFS 搜索 (叠加 pipe_adj 和 extra_adj) affected_nodes: set[str] = set() queue = deque(start_nodes) while queue: @@ -68,18 +123,29 @@ def valve_isolation_analysis( if node in affected_nodes: continue affected_nodes.add(node) - for neighbor in adjacency.get(node, []): - if neighbor not in affected_nodes: - queue.append(neighbor) + # 遍历永久管道邻居 + if node in pipe_adj: + for neighbor in pipe_adj[node]: + if neighbor not in affected_nodes: + queue.append(neighbor) + + # 遍历故障阀门带来的额外邻居 + if node in extra_adj: + for neighbor in extra_adj[node]: + if neighbor not in affected_nodes: + queue.append(neighbor) + + # 5. 结果聚合 must_close_valves: list[str] = [] optional_valves: list[str] = [] - for valve_id, (node1, node2) in valve_links.items(): - in_node1 = node1 in affected_nodes - in_node2 = node2 in affected_nodes - if in_node1 and in_node2: + + for valve_id, (n1, n2) in boundary_valves.items(): + in_n1 = n1 in affected_nodes + in_n2 = n2 in affected_nodes + if in_n1 and in_n2: optional_valves.append(valve_id) - elif in_node1 or in_node2: + elif in_n1 or in_n2: must_close_valves.append(valve_id) must_close_valves.sort() @@ -87,6 +153,7 @@ def valve_isolation_analysis( result = { "accident_elements": target_elements, + "disabled_valves": disabled_valves, "affected_nodes": sorted(affected_nodes), "must_close_valves": must_close_valves, "optional_valves": optional_valves, diff --git a/app/api/v1/endpoints/data_query.py b/app/api/v1/endpoints/data_query.py index 794ae19..c16ad40 100644 --- a/app/api/v1/endpoints/data_query.py +++ b/app/api/v1/endpoints/data_query.py @@ -316,7 +316,7 @@ async def fastapi_query_all_scheme_all_records( return loaded_dict results = influxdb_api.query_scheme_all_record( - scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate + scheme_Type=schemetype, scheme_name=schemename, query_date=querydate ) packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) @@ -334,7 +334,7 @@ async def fastapi_query_all_scheme_all_records_property( all_results = msgpack.unpackb(data, object_hook=decode_datetime) else: all_results = influxdb_api.query_scheme_all_record( - scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate + scheme_Type=schemetype, scheme_name=schemename, query_date=querydate ) packed = msgpack.packb(all_results, default=encode_datetime) redis_client.set(cache_key, packed) diff --git a/app/api/v1/endpoints/extension.py b/app/api/v1/endpoints/extension.py index 4a2144f..c56ade6 100644 --- a/app/api/v1/endpoints/extension.py +++ b/app/api/v1/endpoints/extension.py @@ -22,7 +22,7 @@ async def get_all_extension_data_endpoint(network: str) -> dict[str, Any]: async def get_extension_data_endpoint(network: str, key: str) -> str | None: return get_extension_data(network, key) -@router.post("/setextensiondata", response_model=None) +@router.post("/setextensiondata/", response_model=None) async def set_extension_data_endpoint(network: str, req: Request) -> ChangeSet: props = await req.json() print(props) diff --git a/app/api/v1/endpoints/simulation.py b/app/api/v1/endpoints/simulation.py index 9a5981a..6c4cd46 100644 --- a/app/api/v1/endpoints/simulation.py +++ b/app/api/v1/endpoints/simulation.py @@ -60,7 +60,7 @@ class BurstAnalysis(BaseModel): modify_fixed_pump_pattern: Optional[dict[str, list]] = None modify_variable_pump_pattern: Optional[dict[str, list]] = None modify_valve_opening: Optional[dict[str, float]] = None - scheme_Name: Optional[str] = None + scheme_name: Optional[str] = None class SchedulingAnalysis(BaseModel): @@ -78,7 +78,7 @@ class PressureRegulation(BaseModel): pump_control: dict tank_init_level: Optional[dict] = None duration: Optional[int] = 900 - scheme_Name: Optional[str] = None + scheme_name: Optional[str] = None class ProjectManagement(BaseModel): @@ -239,9 +239,29 @@ async def fastapi_valve_close_analysis( @router.get("/valve_isolation_analysis/") async def valve_isolation_endpoint( - network: str, accident_element: List[str] = Query(...) + network: str, + accident_element: List[str] = Query(...), + disabled_valves: List[str] = Query(None), ): - return analyze_valve_isolation(network, accident_element) + result = { + "accident_element": "P461309", + "accident_elements": ["P461309"], + "affected_nodes": [ + "J316629_A", + "J317037_B", + "J317060_B", + "J408189_B", + "J499996", + "J524940", + "J535933", + "J58841", + ], + "isolatable": True, + "must_close_valves": ["210521658", "V12974", "V12986", "V12993"], + "optional_valves": [], + } + result = analyze_valve_isolation(network, accident_element, disabled_valves) + return result @router.get("/flushinganalysis/") @@ -342,7 +362,7 @@ async def fastapi_pressure_regulation(data: PressureRegulation) -> str: modify_tank_initial_level=item["tank_init_level"], modify_fixed_pump_pattern=fixed_pump_pattern or None, modify_variable_pump_pattern=variable_pump_pattern or None, - scheme_Name=item["scheme_Name"], + scheme_name=item["scheme_name"], ) return "success" diff --git a/app/infra/db/influxdb/api.py b/app/infra/db/influxdb/api.py index 6926ddc..1ea6663 100644 --- a/app/infra/db/influxdb/api.py +++ b/app/infra/db/influxdb/api.py @@ -405,7 +405,7 @@ def create_and_initialize_buckets(org_name: str) -> None: .tag("date", None) .tag("ID", None) .tag("scheme_Type", None) - .tag("scheme_Name", None) + .tag("scheme_name", None) .field("flow", 0.0) .field("leakage", 0.0) .field("velocity", 0.0) @@ -421,7 +421,7 @@ def create_and_initialize_buckets(org_name: str) -> None: .tag("date", None) .tag("ID", None) .tag("scheme_Type", None) - .tag("scheme_Name", None) + .tag("scheme_name", None) .field("head", 0.0) .field("pressure", 0.0) .field("actualdemand", 0.0) @@ -437,7 +437,7 @@ def create_and_initialize_buckets(org_name: str) -> None: .tag("description", None) .tag("device_ID", None) .tag("scheme_Type", None) - .tag("scheme_Name", None) + .tag("scheme_name", None) .field("monitored_value", 0.0) .field("datacleaning_value", 0.0) .field("scheme_simulation_value", 0.0) @@ -1812,7 +1812,7 @@ def query_scheme_SCADA_data_by_device_ID_and_time( query_ids_list: List[str], query_time: str, scheme_Type: str, - scheme_Name: str, + scheme_name: str, bucket: str = "scheme_simulation_result", ) -> Dict[str, float]: """ @@ -1843,7 +1843,7 @@ def query_scheme_SCADA_data_by_device_ID_and_time( flux_query = f""" from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) - |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "monitored_value" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}") + |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "monitored_value" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_name"] == "{scheme_name}") """ # 执行查询 try: @@ -2585,7 +2585,7 @@ def query_all_scheme_record_by_time_property( flux_query = f""" from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) - |> filter(fn: (r) => r["scheme_Name"] == "{scheme_name}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}") + |> filter(fn: (r) => r["scheme_name"] == "{scheme_name}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}") """ # 执行查询 tables = query_api.query(flux_query) @@ -2635,7 +2635,7 @@ def query_scheme_simulation_result_by_ID_time( flux_query = f""" from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) - |> filter(fn: (r) => r["scheme_Name"] == "{scheme_name}" and r["_measurement"] == "node" and r["ID"] == "{ID}") + |> filter(fn: (r) => r["scheme_name"] == "{scheme_name}" and r["_measurement"] == "node" and r["ID"] == "{ID}") |> pivot( rowKey:["_time"], columnKey:["_field"], @@ -2660,7 +2660,7 @@ def query_scheme_simulation_result_by_ID_time( flux_query = f""" from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) - |> filter(fn: (r) => r["scheme_Name"] == "{scheme_name}" and r["_measurement"] == "link" and r["ID"] == "{ID}") + |> filter(fn: (r) => r["scheme_name"] == "{scheme_name}" and r["_measurement"] == "link" and r["ID"] == "{ID}") |> pivot( rowKey:["_time"], columnKey:["_field"], @@ -3228,7 +3228,7 @@ def store_scheme_simulation_result_to_influxdb( scheme_start_time: str, num_periods: int = 1, scheme_Type: str = None, - scheme_Name: str = None, + scheme_name: str = None, bucket: str = "scheme_simulation_result", ): """ @@ -3238,7 +3238,7 @@ def store_scheme_simulation_result_to_influxdb( :param scheme_start_time: (str): 方案模拟开始时间。 :param num_periods: (int): 方案模拟的周期数 :param scheme_Type: (str): 方案类型 - :param scheme_Name: (str): 方案名称 + :param scheme_name: (str): 方案名称 :param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "scheme_simulation_result"。 :return: """ @@ -3299,7 +3299,7 @@ def store_scheme_simulation_result_to_influxdb( .tag("date", date_str) .tag("ID", node_id) .tag("scheme_Type", scheme_Type) - .tag("scheme_Name", scheme_Name) + .tag("scheme_name", scheme_name) .field("head", data.get("head", 0.0)) .field("pressure", data.get("pressure", 0.0)) .field("actualdemand", data.get("demand", 0.0)) @@ -3323,7 +3323,7 @@ def store_scheme_simulation_result_to_influxdb( .tag("date", date_str) .tag("ID", link_id) .tag("scheme_Type", scheme_Type) - .tag("scheme_Name", scheme_Name) + .tag("scheme_name", scheme_name) .field("flow", data.get("flow", 0.0)) .field("velocity", data.get("velocity", 0.0)) .field("headloss", data.get("headloss", 0.0)) @@ -3410,13 +3410,13 @@ def query_corresponding_query_id_and_element_id(name: str) -> None: # 2025/03/11 def fill_scheme_simulation_result_to_SCADA( scheme_Type: str = None, - scheme_Name: str = None, + scheme_name: str = None, query_date: str = None, bucket: str = "scheme_simulation_result", ): """ :param scheme_Type: 方案类型 - :param scheme_Name: 方案名称 + :param scheme_name: 方案名称 :param query_date: 查询日期,格式为 'YYYY-MM-DD' :param bucket: InfluxDB 的 bucket 名称,默认值为 "scheme_simulation_result" :return: @@ -3458,7 +3458,7 @@ def fill_scheme_simulation_result_to_SCADA( for key, value in globals.scheme_source_outflow_ids.items(): scheme_source_outflow_result = query_scheme_curve_by_ID_property( scheme_Type=scheme_Type, - scheme_Name=scheme_Name, + scheme_name=scheme_name, query_date=query_date, ID=value, type="link", @@ -3471,7 +3471,7 @@ def fill_scheme_simulation_result_to_SCADA( .tag("date", query_date) .tag("device_ID", key) .tag("scheme_Type", scheme_Type) - .tag("scheme_Name", scheme_Name) + .tag("scheme_name", scheme_name) .field("monitored_value", data["value"]) .time(data["time"], write_precision="s") ) @@ -3481,7 +3481,7 @@ def fill_scheme_simulation_result_to_SCADA( for key, value in globals.scheme_pipe_flow_ids.items(): scheme_pipe_flow_result = query_scheme_curve_by_ID_property( scheme_Type=scheme_Type, - scheme_Name=scheme_Name, + scheme_name=scheme_name, query_date=query_date, ID=value, type="link", @@ -3493,7 +3493,7 @@ def fill_scheme_simulation_result_to_SCADA( .tag("date", query_date) .tag("device_ID", key) .tag("scheme_Type", scheme_Type) - .tag("scheme_Name", scheme_Name) + .tag("scheme_name", scheme_name) .field("monitored_value", data["value"]) .time(data["time"], write_precision="s") ) @@ -3503,7 +3503,7 @@ def fill_scheme_simulation_result_to_SCADA( for key, value in globals.scheme_pressure_ids.items(): scheme_pressure_result = query_scheme_curve_by_ID_property( scheme_Type=scheme_Type, - scheme_Name=scheme_Name, + scheme_name=scheme_name, query_date=query_date, ID=value, type="node", @@ -3515,7 +3515,7 @@ def fill_scheme_simulation_result_to_SCADA( .tag("date", query_date) .tag("device_ID", key) .tag("scheme_Type", scheme_Type) - .tag("scheme_Name", scheme_Name) + .tag("scheme_name", scheme_name) .field("monitored_value", data["value"]) .time(data["time"], write_precision="s") ) @@ -3525,7 +3525,7 @@ def fill_scheme_simulation_result_to_SCADA( for key, value in globals.scheme_demand_ids.items(): scheme_demand_result = query_scheme_curve_by_ID_property( scheme_Type=scheme_Type, - scheme_Name=scheme_Name, + scheme_name=scheme_name, query_date=query_date, ID=value, type="node", @@ -3537,7 +3537,7 @@ def fill_scheme_simulation_result_to_SCADA( .tag("date", query_date) .tag("device_ID", key) .tag("scheme_Type", scheme_Type) - .tag("scheme_Name", scheme_Name) + .tag("scheme_name", scheme_name) .field("monitored_value", data["value"]) .time(data["time"], write_precision="s") ) @@ -3547,7 +3547,7 @@ def fill_scheme_simulation_result_to_SCADA( for key, value in globals.scheme_quality_ids.items(): scheme_quality_result = query_scheme_curve_by_ID_property( scheme_Type=scheme_Type, - scheme_Name=scheme_Name, + scheme_name=scheme_name, query_date=query_date, ID=value, type="node", @@ -3559,7 +3559,7 @@ def fill_scheme_simulation_result_to_SCADA( .tag("date", query_date) .tag("device_ID", key) .tag("scheme_Type", scheme_Type) - .tag("scheme_Name", scheme_Name) + .tag("scheme_name", scheme_name) .field("monitored_value", data["value"]) .time(data["time"], write_precision="s") ) @@ -3630,14 +3630,14 @@ def query_SCADA_data_curve( # 2025/02/18 def query_scheme_all_record_by_time( scheme_Type: str, - scheme_Name: str, + scheme_name: str, query_time: str, bucket: str = "scheme_simulation_result", ) -> tuple: """ 查询指定方案某一时刻的所有记录,包括‘node'和‘link’,分别以指定格式返回。 :param scheme_Type: 方案类型 - :param scheme_Name: 方案名称 + :param scheme_name: 方案名称 :param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 :param bucket: 数据存储的 bucket 名称。 :return: dict: tuple: (node_records, link_records) @@ -3660,7 +3660,7 @@ def query_scheme_all_record_by_time( flux_query = f""" from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) - |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["_measurement"] == "node" or r["_measurement"] == "link") + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_name"] == "{scheme_name}" and r["_measurement"] == "node" or r["_measurement"] == "link") |> pivot( rowKey:["_time"], columnKey:["_field"], @@ -3711,7 +3711,7 @@ def query_scheme_all_record_by_time( # 2025/03/04 def query_scheme_all_record_by_time_property( scheme_Type: str, - scheme_Name: str, + scheme_name: str, query_time: str, type: str, property: str, @@ -3720,7 +3720,7 @@ def query_scheme_all_record_by_time_property( """ 查询指定方案某一时刻‘node'或‘link’某一属性值,以指定格式返回。 :param scheme_Type: 方案类型 - :param scheme_Name: 方案名称 + :param scheme_name: 方案名称 :param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 :param type: 查询的类型(决定 measurement) :param property: 查询的字段名称(field) @@ -3752,7 +3752,7 @@ def query_scheme_all_record_by_time_property( flux_query = f""" from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) - |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}") + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_name"] == "{scheme_name}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}") """ # 执行查询 tables = query_api.query(flux_query) @@ -3768,7 +3768,7 @@ def query_scheme_all_record_by_time_property( # 2025/02/19 def query_scheme_curve_by_ID_property( scheme_Type: str, - scheme_Name: str, + scheme_name: str, query_date: str, ID: str, type: str, @@ -3776,9 +3776,9 @@ def query_scheme_curve_by_ID_property( bucket: str = "scheme_simulation_result", ) -> list: """ - 根据scheme_Type和scheme_Name,查询该模拟方案中,某一node或link的某一属性值的所有时间的结果 + 根据scheme_Type和scheme_name,查询该模拟方案中,某一node或link的某一属性值的所有时间的结果 :param scheme_Type: 方案类型 - :param scheme_Name: 方案名称 + :param scheme_name: 方案名称 :param query_date: 查询日期,格式为 'YYYY-MM-DD' :param ID: 元素的ID :param type: 元素的类型,node或link @@ -3817,7 +3817,7 @@ def query_scheme_curve_by_ID_property( flux_query = f""" from(bucket: "{bucket}") |> range(start: {start_time}, stop: {stop_time}) - |> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["ID"] == "{ID}" and r["_field"] == "{property}") + |> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_name"] == "{scheme_name}" and r["ID"] == "{ID}" and r["_field"] == "{property}") """ # 执行查询 tables = query_api.query(flux_query) @@ -3833,14 +3833,14 @@ def query_scheme_curve_by_ID_property( # 2025/02/21 def query_scheme_all_record( scheme_Type: str, - scheme_Name: str, + scheme_name: str, query_date: str, bucket: str = "scheme_simulation_result", ) -> tuple: """ 查询指定方案的所有记录,包括‘node'和‘link’,分别以指定格式返回。 :param scheme_Type: 方案类型 - :param scheme_Name: 方案名称 + :param scheme_name: 方案名称 :param query_date: 查询日期,格式为 'YYYY-MM-DD' :param bucket: 数据存储的 bucket 名称。 :return: dict: tuple: (node_records, link_records) @@ -3867,7 +3867,7 @@ def query_scheme_all_record( flux_query = f""" from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) - |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["_measurement"] == "node" or r["_measurement"] == "link") + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_name"] == "{scheme_name}" and r["_measurement"] == "node" or r["_measurement"] == "link") |> pivot( rowKey:["_time"], columnKey:["_field"], @@ -3918,7 +3918,7 @@ def query_scheme_all_record( # 2025/03/04 def query_scheme_all_record_property( scheme_Type: str, - scheme_Name: str, + scheme_name: str, query_date: str, type: str, property: str, @@ -3927,7 +3927,7 @@ def query_scheme_all_record_property( """ 查询指定方案的‘node'或‘link’的某一属性值,以指定格式返回。 :param scheme_Type: 方案类型 - :param scheme_Name: 方案名称 + :param scheme_name: 方案名称 :param query_date: 查询日期,格式为 'YYYY-MM-DD' :param type: 查询的类型(决定 measurement) :param property: 查询的字段名称(field) @@ -3964,7 +3964,7 @@ def query_scheme_all_record_property( flux_query = f""" from(bucket: "{bucket}") |> range(start: {start_time}, stop: {stop_time}) - |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["date"] == "{query_date}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}") + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_name"] == "{scheme_name}" and r["date"] == "{query_date}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}") """ # 执行查询 tables = query_api.query(flux_query) @@ -4246,7 +4246,7 @@ def export_scheme_simulation_result_to_csv_time( link_data[key]["measurement"] = record.get_measurement() link_data[key]["date"] = record.values.get("date", None) link_data[key]["scheme_Type"] = record.values.get("scheme_Type", None) - link_data[key]["scheme_Name"] = record.values.get("scheme_Name", None) + link_data[key]["scheme_name"] = record.values.get("scheme_name", None) # 构建 Flux 查询语句,查询指定时间范围内的数据 flux_query_node = f""" from(bucket: "{bucket}") @@ -4268,7 +4268,7 @@ def export_scheme_simulation_result_to_csv_time( node_data[key]["measurement"] = record.get_measurement() node_data[key]["date"] = record.values.get("date", None) node_data[key]["scheme_Type"] = record.values.get("scheme_Type", None) - node_data[key]["scheme_Name"] = record.values.get("scheme_Name", None) + node_data[key]["scheme_name"] = record.values.get("scheme_name", None) for key in set(link_data.keys()): row = {"time": key[0], "ID": key[1]} row.update(link_data.get(key, {})) @@ -4289,7 +4289,7 @@ def export_scheme_simulation_result_to_csv_time( "measurement", "date", "scheme_Type", - "scheme_Name", + "scheme_name", "ID", "flow", "leakage", @@ -4312,7 +4312,7 @@ def export_scheme_simulation_result_to_csv_time( "measurement", "date", "scheme_Type", - "scheme_Name", + "scheme_name", "ID", "head", "pressure", @@ -4331,14 +4331,14 @@ def export_scheme_simulation_result_to_csv_time( # 2025/02/18 def export_scheme_simulation_result_to_csv_scheme( scheme_Type: str, - scheme_Name: str, + scheme_name: str, query_date: str, bucket: str = "scheme_simulation_result", ) -> None: """ 导出influxdb中scheme_simulation_result这个bucket的数据到csv中 :param scheme_Type: 查询的方案类型 - :param scheme_Name: 查询的方案名 + :param scheme_name: 查询的方案名 :param query_date: 查询日期,格式为 'YYYY-MM-DD' :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" :return: @@ -4366,7 +4366,7 @@ def export_scheme_simulation_result_to_csv_scheme( flux_query_link = f""" from(bucket: "{bucket}") |> range(start: {start_time}, stop: {stop_time}) - |> filter(fn: (r) => r["_measurement"] == "link" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}") + |> filter(fn: (r) => r["_measurement"] == "link" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_name"] == "{scheme_name}") """ # 执行查询 link_tables = query_api.query(flux_query_link) @@ -4383,12 +4383,12 @@ def export_scheme_simulation_result_to_csv_scheme( link_data[key]["measurement"] = record.get_measurement() link_data[key]["date"] = record.values.get("date", None) link_data[key]["scheme_Type"] = record.values.get("scheme_Type", None) - link_data[key]["scheme_Name"] = record.values.get("scheme_Name", None) + link_data[key]["scheme_name"] = record.values.get("scheme_name", None) # 构建 Flux 查询语句,查询指定时间范围内的数据 flux_query_node = f""" from(bucket: "{bucket}") |> range(start: {start_time}, stop: {stop_time}) - |> filter(fn: (r) => r["_measurement"] == "node" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}") + |> filter(fn: (r) => r["_measurement"] == "node" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_name"] == "{scheme_name}") """ # 执行查询 node_tables = query_api.query(flux_query_node) @@ -4405,7 +4405,7 @@ def export_scheme_simulation_result_to_csv_scheme( node_data[key]["measurement"] = record.get_measurement() node_data[key]["date"] = record.values.get("date", None) node_data[key]["scheme_Type"] = record.values.get("scheme_Type", None) - node_data[key]["scheme_Name"] = record.values.get("scheme_Name", None) + node_data[key]["scheme_name"] = record.values.get("scheme_name", None) for key in set(link_data.keys()): row = {"time": key[0], "ID": key[1]} row.update(link_data.get(key, {})) @@ -4416,10 +4416,10 @@ def export_scheme_simulation_result_to_csv_scheme( node_rows.append(row) # 动态生成 CSV 文件名 csv_filename_link = ( - f"scheme_simulation_link_result_{scheme_Name}_of_{scheme_Type}.csv" + f"scheme_simulation_link_result_{scheme_name}_of_{scheme_Type}.csv" ) csv_filename_node = ( - f"scheme_simulation_node_result_{scheme_Name}_of_{scheme_Type}.csv" + f"scheme_simulation_node_result_{scheme_name}_of_{scheme_Type}.csv" ) # 写入到 CSV 文件 with open(csv_filename_link, mode="w", newline="") as file: @@ -4430,7 +4430,7 @@ def export_scheme_simulation_result_to_csv_scheme( "measurement", "date", "scheme_Type", - "scheme_Name", + "scheme_name", "ID", "flow", "leakage", @@ -4453,7 +4453,7 @@ def export_scheme_simulation_result_to_csv_scheme( "measurement", "date", "scheme_Type", - "scheme_Name", + "scheme_name", "ID", "head", "pressure", @@ -4878,15 +4878,15 @@ if __name__ == "__main__": # export_scheme_simulation_result_to_csv_time(start_date='2025-02-13', end_date='2025-02-15') # 示例9:export_scheme_simulation_result_to_csv_scheme - # export_scheme_simulation_result_to_csv_scheme(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_date='2025-03-10') + # export_scheme_simulation_result_to_csv_scheme(scheme_Type='burst_Analysis', scheme_name='scheme1', query_date='2025-03-10') # 示例10:query_scheme_all_record_by_time - # node_records, link_records = query_scheme_all_record_by_time(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_time="2025-02-14T10:30:00+08:00") + # node_records, link_records = query_scheme_all_record_by_time(scheme_Type='burst_Analysis', scheme_name='scheme1', query_time="2025-02-14T10:30:00+08:00") # print("Node 数据:", node_records) # print("Link 数据:", link_records) # 示例11:query_scheme_curve_by_ID_property - # curve_result = query_scheme_curve_by_ID_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', ID='ZBBDTZDP000022', + # curve_result = query_scheme_curve_by_ID_property(scheme_Type='burst_Analysis', scheme_name='scheme1', ID='ZBBDTZDP000022', # type='node', property='head') # print(curve_result) @@ -4896,7 +4896,7 @@ if __name__ == "__main__": # print("Link 数据:", link_records) # 示例13:query_scheme_all_record - # node_records, link_records = query_scheme_all_record(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_date='2025-03-10') + # node_records, link_records = query_scheme_all_record(scheme_Type='burst_Analysis', scheme_name='scheme1', query_date='2025-03-10') # print("Node 数据:", node_records) # print("Link 数据:", link_records) @@ -4909,16 +4909,16 @@ if __name__ == "__main__": # print(result_records) # 示例16:query_scheme_all_record_by_time_property - # result_records = query_scheme_all_record_by_time_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', + # result_records = query_scheme_all_record_by_time_property(scheme_Type='burst_Analysis', scheme_name='scheme1', # query_time='2025-02-14T10:30:00+08:00', type='node', property='head') # print(result_records) # 示例17:query_scheme_all_record_property - # result_records = query_scheme_all_record_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_date='2025-03-10', type='node', property='head') + # result_records = query_scheme_all_record_property(scheme_Type='burst_Analysis', scheme_name='scheme1', query_date='2025-03-10', type='node', property='head') # print(result_records) # 示例18:fill_scheme_simulation_result_to_SCADA - # fill_scheme_simulation_result_to_SCADA(scheme_Type='burst_Analysis', scheme_Name='burst0330', query_date='2025-03-30') + # fill_scheme_simulation_result_to_SCADA(scheme_Type='burst_Analysis', scheme_name='burst0330', query_date='2025-03-30') # 示例19:query_SCADA_data_by_device_ID_and_timerange # result = query_SCADA_data_by_device_ID_and_timerange(query_ids_list=globals.pressure_non_realtime_ids, start_time='2025-04-16T00:00:00+08:00', @@ -4926,7 +4926,7 @@ if __name__ == "__main__": # print(result) # 示例:manually_get_burst_flow - # leakage = manually_get_burst_flow(scheme_Type='burst_Analysis', scheme_Name='burst_scheme', scheme_start_time='2025-03-10T12:00:00+08:00') + # leakage = manually_get_burst_flow(scheme_Type='burst_Analysis', scheme_name='burst_scheme', scheme_start_time='2025-03-10T12:00:00+08:00') # print(leakage) # 示例:upload_cleaned_SCADA_data_to_influxdb diff --git a/app/services/simulation.py b/app/services/simulation.py index a0f8fdf..0bc891c 100644 --- a/app/services/simulation.py +++ b/app/services/simulation.py @@ -1249,7 +1249,7 @@ def run_simulation( endtime = time.time() logging.info("store time: %f", endtime - starttime) # 暂不需要再次存储 SCADA 模拟信息 - # TimescaleInternalQueries.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name) + # TimescaleInternalQueries.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_name=scheme_name) # if simulation_type.upper() == "REALTIME": # influxdb_api.store_realtime_simulation_result_to_influxdb( @@ -1262,10 +1262,10 @@ def run_simulation( # modify_pattern_start_time, # num_periods_result, # scheme_Type, - # scheme_Name, + # scheme_name, # ) # 暂不需要再次存储 SCADA 模拟信息 - # influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name) + # influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_name=scheme_name) print("after store result") @@ -1345,7 +1345,7 @@ if __name__ == "__main__": # run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time='2025-02-25T23:45:00+08:00') # 模拟示例2 # run_simulation(name='bb', simulation_type="extended", modify_pattern_start_time='2025-03-10T12:00:00+08:00', - # modify_total_duration=1800, scheme_Type="burst_Analysis", scheme_Name="scheme1") + # modify_total_duration=1800, scheme_Type="burst_Analysis", scheme_name="scheme1") # 查询示例1:query_SCADA_ID_corresponding_info # result = query_SCADA_ID_corresponding_info(name='bb', SCADA_ID='P10755') diff --git a/app/services/valve_isolation.py b/app/services/valve_isolation.py index 0d3d101..dc41c05 100644 --- a/app/services/valve_isolation.py +++ b/app/services/valve_isolation.py @@ -4,6 +4,8 @@ from app.algorithms.valve_isolation import valve_isolation_analysis def analyze_valve_isolation( - network: str, accident_element: str | list[str] + network: str, + accident_element: str | list[str], + disabled_valves: list[str] = None, ) -> dict[str, Any]: - return valve_isolation_analysis(network, accident_element) + return valve_isolation_analysis(network, accident_element, disabled_valves) diff --git a/scripts/main.py b/scripts/main.py index 2541bc4..4a71ed8 100644 --- a/scripts/main.py +++ b/scripts/main.py @@ -3233,7 +3233,7 @@ async def fastapi_query_all_scheme_all_records( return loaded_dict results = influxdb_api.query_scheme_all_record( - scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate + scheme_Type=schemetype, scheme_name=schemename, query_date=querydate ) packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) @@ -3257,7 +3257,7 @@ async def fastapi_query_all_scheme_all_records_property( all_results = msgpack.unpackb(data, object_hook=decode_datetime) else: all_results = influxdb_api.query_scheme_all_record( - scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate + scheme_Type=schemetype, scheme_name=schemename, query_date=querydate ) packed = msgpack.packb(all_results, default=encode_datetime) redis_client.set(cache_key, packed) @@ -3585,7 +3585,7 @@ class BurstAnalysis(BaseModel): modify_fixed_pump_pattern: Optional[dict[str, list]] = None modify_variable_pump_pattern: Optional[dict[str, list]] = None modify_valve_opening: Optional[dict[str, float]] = None - scheme_Name: Optional[str] = None + scheme_name: Optional[str] = None @app.post("/burst_analysis/") @@ -3608,7 +3608,7 @@ async def fastapi_burst_analysis(data: BurstAnalysis) -> str: modify_fixed_pump_pattern=item["modify_fixed_pump_pattern"], modify_variable_pump_pattern=item["modify_variable_pump_pattern"], modify_valve_opening=item["modify_valve_opening"], - scheme_Name=item["scheme_Name"], + scheme_name=item["scheme_name"], ) # os.rename(filename2, filename) @@ -3616,7 +3616,7 @@ async def fastapi_burst_analysis(data: BurstAnalysis) -> str: # 将 时间转换成日期,然后缓存这个计算结果 # 缓存key: burst_analysis__ global redis_client - schemename = data.scheme_Name + schemename = data.scheme_name print(data.modify_pattern_start_time) @@ -3627,7 +3627,7 @@ async def fastapi_burst_analysis(data: BurstAnalysis) -> str: cache_key = f"queryallschemeallrecords_burst_Analysis_{schemename}_{querydate}" data = redis_client.get(cache_key) if not data: - results = influxdb_api.query_scheme_all_record("burst_Analysis", scheme_Name=schemename, query_date=querydate) + results = influxdb_api.query_scheme_all_record("burst_Analysis", scheme_name=schemename, query_date=querydate) packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) """ @@ -3712,7 +3712,7 @@ async def fastapi_contaminant_simulation( concentration: float, duration: int, pattern: str = None, - scheme_Name: str = None, + scheme_name: str = None, ) -> str: filename = "c:/lock.simulation" filename2 = "c:/lock.simulation2" diff --git a/scripts/online_Analysis.py b/scripts/online_Analysis.py index 84ee28d..d3e1dbf 100644 --- a/scripts/online_Analysis.py +++ b/scripts/online_Analysis.py @@ -68,7 +68,7 @@ def burst_analysis( :param modify_fixed_pump_pattern: dict中包含多个水泵模式,str为工频水泵的id,list为修改后的pattern :param modify_variable_pump_pattern: dict中包含多个水泵模式,str为变频水泵的id,list为修改后的pattern :param modify_valve_opening: dict中包含多个阀门开启度,str为阀门的id,float为修改后的阀门开启度 - :param scheme_Name: 方案名称 + :param scheme_name: 方案名称 :return: """ scheme_detail: dict = { @@ -294,7 +294,7 @@ def flushing_analysis( modify_valve_opening: dict[str, float] = None, drainage_node_ID: str = None, flushing_flow: float = 0, - scheme_Name: str = None, + scheme_name: str = None, ) -> None: """ 管道冲洗模拟 @@ -304,7 +304,7 @@ def flushing_analysis( :param modify_valve_opening: dict中包含多个阀门开启度,str为阀门的id,float为修改后的阀门开启度 :param drainage_node_ID: 冲洗排放口所在节点ID :param flushing_flow: 冲洗水量,传入参数单位为m3/h - :param scheme_Name: 方案名称 + :param scheme_name: 方案名称 :return: """ print( @@ -397,7 +397,7 @@ def flushing_analysis( modify_total_duration=modify_total_duration, modify_valve_opening=modify_valve_opening, scheme_Type="flushing_Analysis", - scheme_Name=scheme_Name, + scheme_name=scheme_name, ) # step 4. restore the base model if is_project_open(new_name): @@ -417,7 +417,7 @@ def contaminant_simulation( source: str = None,# 污染源节点ID concentration: float = None, # 污染源浓度,单位mg/L source_pattern: str = None, # 污染源时间变化模式名称 - scheme_Name: str = None, + scheme_name: str = None, ) -> None: """ 污染模拟 @@ -428,7 +428,7 @@ def contaminant_simulation( :param concentration: 污染源位置处的浓度,单位mg/L,即默认的污染模拟setting为concentration(应改为 Set point booster) :param source_pattern: 污染源的时间变化模式,若不传入则默认以恒定浓度持续模拟,时间长度等于duration; 若传入,则格式为{1.0,0.5,1.1}等系数列表pattern_step模拟等于模型的hydraulic time step - :param scheme_Name: 方案名称 + :param scheme_name: 方案名称 :return: """ print( @@ -534,7 +534,7 @@ def contaminant_simulation( modify_pattern_start_time=modify_pattern_start_time, modify_total_duration=modify_total_duration, scheme_Type="contaminant_Analysis", - scheme_Name=scheme_Name, + scheme_name=scheme_name, ) # for i in range(1,operation_step): @@ -630,7 +630,7 @@ def pressure_regulation( modify_tank_initial_level: dict[str, float] = None, modify_fixed_pump_pattern: dict[str, list] = None, modify_variable_pump_pattern: dict[str, list] = None, - scheme_Name: str = None, + scheme_name: str = None, ) -> None: """ 区域调压模拟,用来模拟未来15分钟内,开关水泵对区域压力的影响 @@ -640,7 +640,7 @@ def pressure_regulation( :param modify_tank_initial_level: dict中包含多个水塔,str为水塔的id,float为修改后的initial_level :param modify_fixed_pump_pattern: dict中包含多个水泵模式,str为工频水泵的id,list为修改后的pattern :param modify_variable_pump_pattern: dict中包含多个水泵模式,str为变频水泵的id,list为修改后的pattern - :param scheme_Name: 模拟方案名称 + :param scheme_name: 模拟方案名称 :return: """ print( @@ -693,7 +693,7 @@ def pressure_regulation( modify_fixed_pump_pattern=modify_fixed_pump_pattern, modify_variable_pump_pattern=modify_variable_pump_pattern, scheme_Type="pressure_regulation", - scheme_Name=scheme_Name, + scheme_name=scheme_name, ) if is_project_open(new_name): close_project(new_name) @@ -1536,7 +1536,7 @@ if __name__ == "__main__": # 示例1:burst_analysis # burst_analysis(name='bb', modify_pattern_start_time='2025-04-17T00:00:00+08:00', - # burst_ID='GSD230112144241FA18292A84CB', burst_size=400, modify_total_duration=1800, scheme_Name='GSD230112144241FA18292A84CB_400') + # burst_ID='GSD230112144241FA18292A84CB', burst_size=400, modify_total_duration=1800, scheme_name='GSD230112144241FA18292A84CB_400') # 示例:create_user # create_user(name=project_info.name, username='tjwater dev', password='123456')