215 lines
8.1 KiB
Python
215 lines
8.1 KiB
Python
from .database import *
|
|
from .s0_base import *
|
|
|
|
|
|
PIPE_STATUS_OPEN = 'OPEN'
|
|
PIPE_STATUS_CLOSED = 'CLOSED'
|
|
PIPE_STATUS_CV = 'CV'
|
|
|
|
|
|
def get_pipe_schema(name: str) -> dict[str, dict[str, Any]]:
|
|
return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True },
|
|
'node1' : {'type': 'str' , 'optional': False , 'readonly': False},
|
|
'node2' : {'type': 'str' , 'optional': False , 'readonly': False},
|
|
'length' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'diameter' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'roughness' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'minor_loss' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'status' : {'type': 'str' , 'optional': False , 'readonly': False} }
|
|
|
|
|
|
def get_pipe(name: str, id: str) -> dict[str, Any]:
|
|
p = try_read(name, f"select * from pipes where id = '{id}'")
|
|
if p == None:
|
|
return {}
|
|
d = {}
|
|
d['id'] = str(p['id'])
|
|
d['node1'] = str(p['node1'])
|
|
d['node2'] = str(p['node2'])
|
|
d['length'] = float(p['length'])
|
|
d['diameter'] = float(p['diameter'])
|
|
d['roughness'] = float(p['roughness'])
|
|
d['minor_loss'] = float(p['minor_loss'])
|
|
d['status'] = str(p['status'])
|
|
return d
|
|
|
|
# DingZQ, 2025-03-29
|
|
def get_all_pipes(name: str) -> list[dict[str, Any]]:
|
|
p = try_read(name, f"select * from pipes")
|
|
if p == None:
|
|
return []
|
|
|
|
result = []
|
|
for p in p:
|
|
d = {}
|
|
d['id'] = str(p['id'])
|
|
d['node1'] = str(p['node1'])
|
|
d['node2'] = str(p['node2'])
|
|
d['length'] = float(p['length'])
|
|
d['diameter'] = float(p['diameter'])
|
|
d['roughness'] = float(p['roughness'])
|
|
d['minor_loss'] = float(p['minor_loss'])
|
|
d['status'] = str(p['status'])
|
|
result.append(d)
|
|
|
|
return result
|
|
|
|
class Pipe(object):
|
|
def __init__(self, input: dict[str, Any]) -> None:
|
|
self.type = 'pipe'
|
|
self.id = str(input['id'])
|
|
self.node1 = str(input['node1'])
|
|
self.node2 = str(input['node2'])
|
|
self.length = float(input['length'])
|
|
self.diameter = float(input['diameter'])
|
|
self.roughness = float(input['roughness'])
|
|
self.minor_loss = float(input['minor_loss'])
|
|
self.status = str(input['status'])
|
|
|
|
self.f_type = f"'{self.type}'"
|
|
self.f_id = f"'{self.id}'"
|
|
self.f_node1 = f"'{self.node1}'"
|
|
self.f_node2 = f"'{self.node2}'"
|
|
self.f_length = self.length
|
|
self.f_diameter = self.diameter
|
|
self.f_roughness = self.roughness
|
|
self.f_minor_loss = self.minor_loss
|
|
self.f_status = f"'{self.status}'"
|
|
|
|
def as_dict(self) -> dict[str, Any]:
|
|
return { 'type': self.type, 'id': self.id, 'node1': self.node1, 'node2': self.node2, 'length': self.length, 'diameter': self.diameter, 'roughness': self.roughness, 'minor_loss': self.minor_loss, 'status': self.status }
|
|
|
|
def as_id_dict(self) -> dict[str, Any]:
|
|
return { 'type': self.type, 'id': self.id }
|
|
|
|
|
|
def _set_pipe(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
old = Pipe(get_pipe(name, cs.operations[0]['id']))
|
|
raw_new = get_pipe(name, cs.operations[0]['id'])
|
|
|
|
new_dict = cs.operations[0]
|
|
schema = get_pipe_schema(name)
|
|
for key, value in schema.items():
|
|
if key in new_dict and not value['readonly']:
|
|
raw_new[key] = new_dict[key]
|
|
new = Pipe(raw_new)
|
|
|
|
redo_sql = f"update pipes set node1 = {new.f_node1}, node2 = {new.f_node2}, length = {new.f_length}, diameter = {new.f_diameter}, roughness = {new.f_roughness}, minor_loss = {new.f_minor_loss}, status = {new.f_status} where id = {new.f_id};"
|
|
undo_sql = f"update pipes set node1 = {old.f_node1}, node2 = {old.f_node2}, length = {old.f_length}, diameter = {old.f_diameter}, roughness = {old.f_roughness}, minor_loss = {old.f_minor_loss}, status = {old.f_status} where id = {old.f_id};"
|
|
|
|
redo_cs = g_update_prefix | new.as_dict()
|
|
undo_cs = g_update_prefix | old.as_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def set_pipe(name: str, cs: ChangeSet) -> ChangeSet:
|
|
if 'id' not in cs.operations[0]:
|
|
return ChangeSet()
|
|
if get_pipe(name, cs.operations[0]['id']) == {}:
|
|
return ChangeSet()
|
|
return execute_command(name, _set_pipe(name, cs))
|
|
|
|
|
|
def _add_pipe(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
new = Pipe(cs.operations[0])
|
|
|
|
redo_sql = f"insert into _link (id, type) values ({new.f_id}, {new.f_type});"
|
|
redo_sql += f"\ninsert into pipes (id, node1, node2, length, diameter, roughness, minor_loss, status) values ({new.f_id}, {new.f_node1}, {new.f_node2}, {new.f_length}, {new.f_diameter}, {new.f_roughness}, {new.f_minor_loss}, {new.f_status});"
|
|
|
|
undo_sql = f"delete from pipes where id = {new.f_id};"
|
|
undo_sql += f"\ndelete from _link where id = {new.f_id};"
|
|
|
|
redo_cs = g_add_prefix | new.as_dict()
|
|
undo_cs = g_delete_prefix | new.as_id_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def add_pipe(name: str, cs: ChangeSet) -> ChangeSet:
|
|
if 'id' not in cs.operations[0]:
|
|
return ChangeSet()
|
|
if get_pipe(name, cs.operations[0]['id']) != {}:
|
|
return ChangeSet()
|
|
return execute_command(name, _add_pipe(name, cs))
|
|
|
|
|
|
def _delete_pipe(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
old = Pipe(get_pipe(name, cs.operations[0]['id']))
|
|
|
|
redo_sql = f"delete from pipes where id = {old.f_id};"
|
|
redo_sql += f"\ndelete from _link where id = {old.f_id};"
|
|
|
|
undo_sql = f"insert into _link (id, type) values ({old.f_id}, {old.f_type});"
|
|
undo_sql += f"\ninsert into pipes (id, node1, node2, length, diameter, roughness, minor_loss, status) values ({old.f_id}, {old.f_node1}, {old.f_node2}, {old.f_length}, {old.f_diameter}, {old.f_roughness}, {old.f_minor_loss}, {old.f_status});"
|
|
|
|
redo_cs = g_delete_prefix | old.as_id_dict()
|
|
undo_cs = g_add_prefix | old.as_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def delete_pipe(name: str, cs: ChangeSet) -> ChangeSet:
|
|
if 'id' not in cs.operations[0]:
|
|
return ChangeSet()
|
|
if get_pipe(name, cs.operations[0]['id']) == {}:
|
|
return ChangeSet()
|
|
return execute_command(name, _delete_pipe(name, cs))
|
|
|
|
|
|
#--------------------------------------------------------------
|
|
# [EPA2][EPA3]
|
|
# [IN]
|
|
# id node1 node2 length diam rcoeff (lcoeff status) ;desc
|
|
# [OUT]
|
|
# id node1 node2 length diam rcoeff lcoeff (status) ;desc
|
|
#--------------------------------------------------------------
|
|
|
|
|
|
def inp_in_pipe(line: str) -> str:
|
|
tokens = line.split()
|
|
|
|
num = len(tokens)
|
|
has_desc = tokens[-1].startswith(';')
|
|
num_without_desc = (num - 1) if has_desc else num
|
|
|
|
id = str(tokens[0])
|
|
node1 = str(tokens[1])
|
|
node2 = str(tokens[2])
|
|
length = float(tokens[3])
|
|
diameter = float(tokens[4])
|
|
roughness = float(tokens[5])
|
|
minor_loss = float(tokens[6])
|
|
# status is must-have, here fix input
|
|
status = str(tokens[7].upper()) if num_without_desc >= 8 else PIPE_STATUS_OPEN
|
|
desc = str(tokens[-1]) if has_desc else None
|
|
|
|
return str(f"insert into _link (id, type) values ('{id}', 'pipe');insert into pipes (id, node1, node2, length, diameter, roughness, minor_loss, status) values ('{id}', '{node1}', '{node2}', {length}, {diameter}, {roughness}, {minor_loss}, '{status}');")
|
|
|
|
|
|
def inp_out_pipe(name: str) -> list[str]:
|
|
lines = []
|
|
objs = read_all(name, 'select * from pipes')
|
|
for obj in objs:
|
|
id = obj['id']
|
|
node1 = obj['node1']
|
|
node2 = obj['node2']
|
|
length = obj['length']
|
|
diameter = obj['diameter']
|
|
roughness = obj['roughness']
|
|
minor_loss = obj['minor_loss']
|
|
status = obj['status']
|
|
desc = ';'
|
|
lines.append(f'{id} {node1} {node2} {length} {diameter} {roughness} {minor_loss} {status} {desc}')
|
|
return lines
|
|
|
|
|
|
'''def delete_pipe_by_node(name: str, node: str) -> ChangeSet:
|
|
cs = ChangeSet()
|
|
|
|
rows = read_all(name, f"select id from pipes where node1 = '{node}' or node2 = '{node}'")
|
|
for row in rows:
|
|
cs.append(g_delete_prefix | {'type': 'pipe', 'id': row['id']})
|
|
|
|
return cs'''
|