78 lines
2.6 KiB
Python
78 lines
2.6 KiB
Python
from .database import *
|
|
from .s0_base import *
|
|
|
|
def get_pipe_risk_probability_now(name: str, pipe_id: str) -> dict[str, Any]:
|
|
t = try_read(name, f"select * from pipe_risk_probability where pipeid = '{pipe_id}'")
|
|
if t == None:
|
|
return {}
|
|
|
|
d = {}
|
|
d['pipeid'] = str(t['pipeid'])
|
|
d['pipeage'] = t['pipeage']
|
|
d['risk_probability_now'] = t['risk_probability_now']
|
|
|
|
return d
|
|
|
|
def get_pipe_risk_probability(name: str, pipe_id: str) -> dict[str, Any]:
|
|
t = try_read(name, f"select * from pipe_risk_probability where pipeid = '{pipe_id}'")
|
|
if t == None:
|
|
return {}
|
|
|
|
d = {}
|
|
d['pipeid'] = t['pipeid']
|
|
d['x'] = t['x']
|
|
d['y'] = t['y']
|
|
|
|
return d
|
|
|
|
def get_network_pipe_risk_probability_now(name: str) -> list[dict[str, Any]]:
|
|
pipe_risk_probability_list = []
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(f"select * from pipe_risk_probability")
|
|
for record in cur:
|
|
#pipe_risk_probability_list.append(record)
|
|
t = {}
|
|
t['pipeid'] = record['pipeid']
|
|
t['pipeage'] = record['pipeage']
|
|
t['risk_probability_now'] = record['risk_probability_now']
|
|
pipe_risk_probability_list.append(t)
|
|
|
|
return pipe_risk_probability_list
|
|
|
|
def get_pipes_risk_probability(name: str, pipe_ids: list[str]) -> list[dict[str, Any]]:
|
|
pipe_risk_probability_list = []
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(f"select * from pipe_risk_probability")
|
|
for record in cur:
|
|
if record['pipeid'] in pipe_ids:
|
|
t = {}
|
|
t['pipeid'] = record['pipeid']
|
|
t['x'] = record['x']
|
|
t['y'] = record['y']
|
|
pipe_risk_probability_list.append(t)
|
|
|
|
return pipe_risk_probability_list
|
|
|
|
# node : {}, link : {}
|
|
def get_pipe_risk_probability_geometries(name: str) -> dict[str, Any]:
|
|
pipe_risk_probability_geometries = {}
|
|
|
|
key_pipeId = '编码'
|
|
# key_startnode = '上游节点'
|
|
# key_endnode = '下游节点'
|
|
key_geometry = 'geometry'
|
|
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(f"select {key_pipeId}, ST_AsGeoJSON(geometry) AS {key_geometry} from gis_pipe")
|
|
for record in cur:
|
|
id = record[key_pipeId]
|
|
geom = json.loads(record[key_geometry])
|
|
print(geom)
|
|
print(geom['coordinates'])
|
|
|
|
pipe_risk_probability_geometries[id] = {
|
|
'start': geom['coordinates'][0],
|
|
'end': geom['coordinates'][1]
|
|
}
|
|
|
|
return pipe_risk_probability_geometries |