Files
TJWaterServer/api/s0_base.py
2025-04-17 21:18:30 +08:00

276 lines
7.7 KiB
Python

from psycopg.rows import dict_row, Row
from .connection import g_conn_dict as conn
from .database import read
_NODE = '_node'
_LINK = '_link'
_CURVE = '_curve'
_PATTERN = '_pattern'
_REGION = '_region'
JUNCTION = 'junction'
RESERVOIR = 'reservoir'
TANK = 'tank'
PIPE = 'pipe'
PUMP = 'pump'
VALVE = 'valve'
PATTERN = 'pattern'
CURVE = 'curve'
REGION = 'region'
# DingZQ, 2025-02-05
'''
C++ 代码里已经定义了这些 enum 值
{
kNothing = -1,
//Node
kReservoir = 0,
kTank,
kJunction,
//Link
kPipe,
kPump,
kValve,
'''
ELEMENT_TYPES : dict[str, int] = {
RESERVOIR : 0,
TANK : 1,
JUNCTION : 2,
PIPE : 3,
PUMP : 4,
VALVE : 5,
}
def _get_from(name: str, id: str, base_type: str) -> Row | None:
with conn[name].cursor(row_factory=dict_row) as cur:
cur.execute(f"select * from {base_type} where id = '{id}'")
return cur.fetchone()
def is_node(name: str, id: str) -> bool:
return _get_from(name, id, _NODE) != None
def is_junction(name: str, id: str) -> bool:
row = _get_from(name, id, _NODE)
return row != None and row['type'] == JUNCTION
def is_reservoir(name: str, id: str) -> bool:
row = _get_from(name, id, _NODE)
return row != None and row['type'] == RESERVOIR
def is_tank(name: str, id: str) -> bool:
row = _get_from(name, id, _NODE)
return row != None and row['type'] == TANK
def is_link(name: str, id: str) -> bool:
return _get_from(name, id, _LINK) != None
def is_pipe(name: str, id: str) -> bool:
row = _get_from(name, id, _LINK)
return row != None and row['type'] == PIPE
def is_pump(name: str, id: str) -> bool:
row = _get_from(name, id, _LINK)
return row != None and row['type'] == PUMP
def is_valve(name: str, id: str) -> bool:
row = _get_from(name, id, _LINK)
return row != None and row['type'] == VALVE
# DingZQ, 2025-02-05
def get_node_type(name: str, node_id: str) -> str:
row = _get_from(name, node_id, _NODE)
return row['type']
def get_link_type(name: str, link_id: str) -> str:
row = _get_from(name, link_id, _LINK)
return row['type']
def get_element_type(name: str, element_id: str) -> str:
if is_node(name, element_id):
return get_node_type(name, element_id)
elif is_link(name, element_id):
return get_link_type(name, element_id)
else:
return None
def get_element_type_value(name: str, element_id: str) -> int:
return ELEMENT_TYPES[get_element_type(name, element_id)]
def is_curve(name: str, id: str) -> bool:
return _get_from(name, id, _CURVE) != None
def is_pattern(name: str, id: str) -> bool:
return _get_from(name, id, _PATTERN) != None
def is_region(name: str, id: str) -> bool:
return _get_from(name, id, _REGION) != None
def _get_all(name: str, base_type: str) -> list[str]:
ids : list[str] = []
with conn[name].cursor(row_factory=dict_row) as cur:
cur.execute(f"select id from {base_type} order by id")
for record in cur:
ids.append(record['id'])
return ids
def get_nodes(name: str) -> list[str]:
return _get_all(name, _NODE)
# DingZQ
def _get_nodes_by_type(name: str, type: str) -> list[str]:
ids : list[str] = []
with conn[name].cursor(row_factory=dict_row) as cur:
cur.execute(f"select id from {_NODE} where type = '{type}' order by id")
for record in cur:
ids.append(record['id'])
return ids
# DingZQ
def get_nodes_id_and_type(name: str) -> dict[str, str]:
nodes_id_and_type: dict[str, str] = {}
with conn[name].cursor(row_factory=dict_row) as cur:
cur.execute(f"select id, type from {_NODE} order by id")
for record in cur:
nodes_id_and_type[record['id']] = record['type']
return nodes_id_and_type
# DingZQ 2024-12-31
def get_major_nodes(name: str, diameter: int) -> list[str]:
major_nodes_set = set()
with conn[name].cursor(row_factory=dict_row) as cur:
cur.execute(f"select node1, node2 from pipes where diameter > {diameter}")
for record in cur:
major_nodes_set.add(record['node1'])
major_nodes_set.add(record['node2'])
return list(major_nodes_set)
# DingZQs
def get_junctions(name: str) -> list[str]:
return _get_nodes_by_type(name, JUNCTION)
# DingZQ
def get_reservoirs(name: str) -> list[str]:
return _get_nodes_by_type(name, RESERVOIR)
# DingZQ
def get_tanks(name: str) -> list[str]:
return _get_nodes_by_type(name, TANK)
# DingZQ
def get_links(name: str) -> list[str]:
return _get_all(name, _LINK)
# DingZQ
def _get_links_by_type(name: str, type: str) -> list[str]:
ids : list[str] = []
with conn[name].cursor(row_factory=dict_row) as cur:
cur.execute(f"select id from {_LINK} where type = '{type}' order by id")
for record in cur:
ids.append(record['id'])
return ids
# DingZQ
def get_links_id_and_type(name: str) -> dict[str, str]:
links_id_and_type: dict[str, str] = {}
with conn[name].cursor(row_factory=dict_row) as cur:
cur.execute(f"select id, type from {_LINK} order by id")
for record in cur:
links_id_and_type[record['id']] = record['type']
return links_id_and_type
# DingZQ 2024-12-31
# 获取直径大于800的管道
def get_major_pipes(name: str, diameter: int) -> list[str]:
major_pipe_ids: list[str] = []
with conn[name].cursor(row_factory=dict_row) as cur:
cur.execute(f"select id from pipes where diameter > {diameter} order by id")
for record in cur:
major_pipe_ids.append(record['id'])
return major_pipe_ids
# DingZQ
def get_pipes(name: str) -> list[str]:
return _get_links_by_type(name, PIPE)
# DingZQ
def get_pumps(name: str) -> list[str]:
return _get_links_by_type(name, PUMP)
# DingZQ
def get_valves(name: str) -> list[str]:
return _get_links_by_type(name, VALVE)
def get_curves(name: str) -> list[str]:
return _get_all(name, _CURVE)
def get_patterns(name: str) -> list[str]:
return _get_all(name, _PATTERN)
def get_regions(name: str) -> list[str]:
return _get_all(name, _REGION)
def get_node_links(name: str, id: str) -> list[str]:
with conn[name].cursor(row_factory=dict_row) as cur:
links: list[str] = []
for p in cur.execute(f"select id from pipes where node1 = '{id}' or node2 = '{id}'").fetchall():
links.append(p['id'])
for p in cur.execute(f"select id from pumps where node1 = '{id}' or node2 = '{id}'").fetchall():
links.append(p['id'])
for p in cur.execute(f"select id from valves where node1 = '{id}' or node2 = '{id}'").fetchall():
links.append(p['id'])
return links
def get_link_nodes(name: str, id: str) -> list[str]:
row = {}
if is_pipe(name, id):
row = read(name, f"select node1, node2 from pipes where id = '{id}'")
elif is_pump(name, id):
row = read(name, f"select node1, node2 from pumps where id = '{id}'")
elif is_valve(name, id):
row = read(name, f"select node1, node2 from valves where id = '{id}'")
return [str(row['node1']), str(row['node2'])]
def get_region_type(name: str, id: str)->str:
if(is_region(name,id)):
type = read(name, f"select type from _region where id = '{id}'")
return type
def get_network_pipe_risk_probability_now(name: str) -> dict[str, Any]:
pipe_risk_probability_list = []
with conn[name].cursor(row_factory=dict_row) as cur:
cur.execute(f"select * from pipe_risk_probability")
for record in cur:
#pipe_risk_probability_list.append(record)
t = {}
t['pipeid'] = record['pipeid']
t['pipeage'] = record['pipeage']
t['risk_probability_now'] = record['risk_probability_now']
pipe_risk_probability_list.append(t)
return pipe_risk_probability_list