Files
TJWaterAgent/.opencode/skills/workflow/source-service-area-analysis/scripts/compute_service_areas.py
T

282 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""
compute_service_areas.py — 水源服务范围分析核心脚本
输入:
sim_result.json — runprojectreturndict 的水力模拟结果
pipes.json — getallpipeproperties 的管道属性列表
reservoirs.json — getallreservoirproperties 的水库属性列表
输出:
service_area_result.json — 包含 node_area_map 和统计信息
算法:
1. 从 pipes 构建无向拓扑图 (node1 <-> node2)
2. 从 sim link_results 确定流向,生成有向图
3. 从 reservoirs 获取所有水源节点
4. 多源 BFS 下游遍历,首次访问即归属性
5. 生成渲染数据与统计
用法:
python compute_service_areas.py sim_result.json pipes.json reservoirs.json \
--output service_area_result.json --network tjwater
"""
import argparse
import json
import os
import sys
from collections import deque
# ── 高区分度 20 色调色板 ──────────────────────────────────────────
PALETTE = [
"#1f77b4", "#ff7f0e", "#2ca02c", "#d62728", "#9467bd",
"#8c564b", "#e377c2", "#7f7f7f", "#bcbd22", "#17becf",
"#aec7e8", "#ffbb78", "#98df8a", "#ff9896", "#c5b0d5",
"#c49c94", "#f7b6d2", "#c7c7c7", "#dbdb8d", "#9edae5",
]
def load_json(path: str) -> dict | list:
"""加载 JSON 文件,支持引用格式(含 result_ref 字段)或列表。"""
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
# 如果顶层是引用包装,展开到内部的列表
if isinstance(data, dict) and "data" in data:
data = data["data"]
if isinstance(data, dict) and "items" in data:
data = data["items"]
return data
def build_undirected_graph(pipes: list) -> tuple[dict, dict]:
"""
从管道列表构建无向拓扑图。
返回:
adj: { node_id: set(neighbor_node_ids) }
edge_map: { (node1, node2): pipe_id } 用于查找管道 ID
"""
adj = {}
edge_map = {}
for p in pipes:
pipe_id = str(p.get("id", p.get("pipe_id", p.get("name", ""))))
n1 = str(p.get("node1", ""))
n2 = str(p.get("node2", ""))
if not n1 or not n2:
continue
adj.setdefault(n1, set()).add(n2)
adj.setdefault(n2, set()).add(n1)
edge_map[(n1, n2)] = pipe_id
edge_map[(n2, n1)] = pipe_id
return adj, edge_map
def build_directed_graph(adj: dict, edge_map: dict, link_results: list) -> dict:
"""
基于模拟流向构建有向图。
link_results 中每条记录含 link_id, flow, status。
流向判定:
flow > 0 → node1 → node2
flow < 0 → node2 → node1
flow ≈ 0 或 status != "Open" → 忽略(视为断连)
注意: link_results 中没有 node1/node2,需要通过 link_id 反查 edge_map。
由于 edge_map 只有 (node1,node2) → pipe_id 的映射,
这里需要先建立 pipe_id → (node1, node2) 的反向索引。
"""
# 建立 pipe_id → (node1, node2) 反向索引
# 注意:edge_map 含有 (n1,n2) 和 (n2,n1) 两个方向,只保留第一个以避免反向覆写
pipe_to_nodes = {}
for (n1, n2), pid in edge_map.items():
if pid not in pipe_to_nodes:
pipe_to_nodes[pid] = (n1, n2)
digraph = {}
missing = 0
closed_or_zero = 0
for link in link_results:
lid = str(link.get("link_id", link.get("id", "")))
flow = link.get("flow", 0)
status = str(link.get("status", "Open"))
# 忽略非 Open 管段和零流量管段(大小写不敏感)
if status.upper() != "OPEN":
closed_or_zero += 1
continue
if abs(flow) < 1e-6:
closed_or_zero += 1
continue
# 查找管段对应的节点对
pair = pipe_to_nodes.get(lid)
if pair is None:
missing += 1
continue
n1, n2 = pair
if flow > 0:
digraph.setdefault(n1, set()).add(n2)
else:
digraph.setdefault(n2, set()).add(n1)
if missing > 0:
print(f"[WARN] {missing} 条模拟管段未在管道拓扑中找到对应节点对", file=sys.stderr)
if closed_or_zero > 0:
print(f"[INFO] 忽略 {closed_or_zero} 条 Closed/零流量管段", file=sys.stderr)
return digraph
def bfs_from_sources(sources: list, digraph: dict) -> dict:
"""
多水源 BFS 下游遍历。首次访问的节点归属到对应水源。
返回: { node_id: source_id }
"""
node_area = {}
queue = deque()
# 初始化:所有水源入队,归属自身
for src in sources:
node_area[src] = src
queue.append(src)
while queue:
cur = queue.popleft()
cur_source = node_area[cur]
for nb in digraph.get(cur, set()):
if nb not in node_area:
node_area[nb] = cur_source
queue.append(nb)
return node_area
def generate_render_data(node_area: dict, sources: list) -> dict:
"""
生成 render_junctions 所需的渲染数据。
返回:
node_area_map: { node_id: area_id }
area_ids: 水源 ID 列表
area_colors: { area_id: color_hex }
"""
# 为每个水源分配颜色
area_colors = {}
for i, src in enumerate(sources):
area_colors[src] = PALETTE[i % len(PALETTE)]
return {
"node_area_map": node_area,
"area_ids": sources,
"area_colors": area_colors,
}
def compute_stats(node_area: dict, sources: list, all_nodes: set) -> dict:
"""计算各水源服务节点数和覆盖率统计。"""
source_counts = {}
for src in sources:
source_counts[src] = sum(1 for v in node_area.values() if v == src)
total = len(all_nodes)
assigned = len(node_area)
unassigned = total - assigned
coverage = (assigned / total * 100) if total > 0 else 0
return {
"total_nodes": total,
"assigned_nodes": assigned,
"unassigned_nodes": unassigned,
"coverage_pct": round(coverage, 2),
"source_counts": source_counts,
}
def main():
parser = argparse.ArgumentParser(
description="水源服务范围分析 — 基于水力模拟流向的多源 BFS 分区"
)
parser.add_argument("sim_file", help="runprojectreturndict 模拟结果 JSON")
parser.add_argument("pipes_file", help="getallpipeproperties 管道属性 JSON")
parser.add_argument("reservoirs_file", help="getallreservoirproperties 水库属性 JSON")
parser.add_argument("--output", "-o", default="/tmp/opencode/service_area_result.json",
help="输出 JSON 文件路径")
parser.add_argument("--network", "-n", default="tjwater",
help="管网名称(仅用于报告标注)")
args = parser.parse_args()
# ── 1. 加载数据 ──
print(f"[INFO] 加载模拟结果: {args.sim_file}")
sim_data = load_json(args.sim_file)
link_results = sim_data.get("link_results", [])
print(f"[INFO] 加载管道属性: {args.pipes_file}")
pipes = load_json(args.pipes_file)
print(f"[INFO] 加载水库列表: {args.reservoirs_file}")
reservoirs = load_json(args.reservoirs_file)
# 提取水源 ID 列表
sources = []
if isinstance(reservoirs, list):
sources = [str(r.get("id", r.get("name", ""))) for r in reservoirs if r.get("id") or r.get("name")]
elif isinstance(reservoirs, dict):
# 可能是 { data: [...] } 格式
for key in ["data", "items", "reservoirs"]:
if key in reservoirs and isinstance(reservoirs[key], list):
sources = [str(r.get("id", r.get("name", ""))) for r in reservoirs[key] if r.get("id") or r.get("name")]
break
if not sources:
# 也可能是 { "551656": {...}, ... } 格式
sources = [str(k) for k in reservoirs if isinstance(reservoirs[k], dict)]
if not sources:
print("[ERROR] 未能从水库数据中提取水源 ID 列表", file=sys.stderr)
sys.exit(1)
print(f"[INFO] 水源数: {len(sources)}")
print(f"[INFO] 水源列表: {sources}")
# ── 2. 构建无向图 ──
adj, edge_map = build_undirected_graph(pipes)
all_nodes = set(adj.keys())
# ── 3. 构建有向图 ──
digraph = build_directed_graph(adj, edge_map, link_results)
# ── 4. BFS 下游遍历 ──
print("[INFO] 执行多源 BFS 下游遍历...")
node_area = bfs_from_sources(sources, digraph)
# ── 5. 统计 ──
stats = compute_stats(node_area, sources, all_nodes)
print(f"[INFO] 总节点数: {stats['total_nodes']}")
print(f"[INFO] 已分配: {stats['assigned_nodes']} ({stats['coverage_pct']:.1f}%)")
print(f"[INFO] 未分配: {stats['unassigned_nodes']}")
print("\n各水源服务节点数 (按数量降序):")
for src, count in sorted(stats["source_counts"].items(), key=lambda x: -x[1]):
pct = (count / stats["assigned_nodes"] * 100) if stats["assigned_nodes"] > 0 else 0
print(f" {src}: {count} ({pct:.1f}%)")
# ── 6. 生成渲染数据 ──
render = generate_render_data(node_area, sources)
# ── 7. 输出 ──
output = {
"network": args.network,
"stats": stats,
"render": render,
"sources": sources,
}
os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True)
with open(args.output, "w", encoding="utf-8") as f:
json.dump(output, f, ensure_ascii=False, indent=2)
print(f"\n[OK] 结果已写入: {args.output}")
if __name__ == "__main__":
main()