from typing import Any from psycopg.rows import Row from .s0_base import * from .change_set import ChangeSet from .s24_coordinates import * from .utility import * from .schema import * PIPE_STATUS_OPEN = 'open' PIPE_STATUS_CLOSED = 'closed' PIPE_STATUS_CV = 'cv' schema: dict[str, dict[str, Any]] = { \ 'id' : define_property(str_type, False, True), \ 'node1' : define_property(str_type), \ 'node2' : define_property(str_type), \ 'length' : define_property(float_type), \ 'diameter' : define_property(float_type), \ 'roughness' : define_property(float_type), \ 'minor_loss': define_property(float_type), \ 'status' : define_property(str_type)} def get_pipe_schema(name: str) -> dict[str, dict[str, Any]]: return schema def _query_pipe(name: str, id: str) -> Row | None: return read(name, f"select * from pipes where id = '{id}'") def _get_pipe_node1(name: str, id: str) -> str | None: row = _query_pipe(name, id) return row['node1'] if row != None else None def _get_pipe_node2(name: str, id: str) -> str | None: row = _query_pipe(name, id) return row['node2'] if row != None else None def add_pipe(name: str, id: str, node1: str, node2: str, length: float = 0, diameter: float = 0, roughness: float = 0, minor_loss: float = 0, status: str = PIPE_STATUS_OPEN) -> ChangeSet: if is_pipe(name, id): return ChangeSet() if not is_node(name, node1): return ChangeSet() if not is_node(name, node2): return ChangeSet() if node1 == node2: return ChangeSet() if status != PIPE_STATUS_OPEN and status != PIPE_STATUS_CLOSED and status != PIPE_STATUS_CV: return ChangeSet() sql = f"insert into _link (id, type) values ('{id}', '{PIPE}');" sql += f"\ninsert into pipes (id, node1, node2, length, diameter, roughness, minor_loss, status) values ('{id}', '{node1}', '{node2}', {length}, {diameter}, {roughness}, {minor_loss}, '{status}');" undo = f"delete from pipes where id = ''{id}'';" undo += f"\ndelete from _link where id = ''{id}'';" write(name, sql) add_operation(name, sql.replace("'", "''"), undo, 'add_pipe', API_ADD, PIPE, id) return get_current_change_set(name) def get_pipe(name: str, id: str) -> dict[str, Any] | None: row = _query_pipe(name, id) if row == None: return None ps: dict[str, str] = {} ps['id'] = id ps['node1'] = row['node1'] ps['node2'] = row['node2'] ps['length'] = float(row['length']) ps['diameter'] = float(row['diameter']) ps['roughness'] = float(row['roughness']) ps['minor_loss'] = float(row['minor_loss']) ps['status'] = row['status'] return ps def set_pipe(name: str, id: str, properties: dict[str, Any]) -> ChangeSet: if not is_pipe(name, id): return ChangeSet() if 'node1' in properties: if not is_node(name, properties['node1']) or _get_pipe_node2(name, id) == properties['node1']: return ChangeSet() if 'node2' in properties: if not is_node(name, properties['node2']) or _get_pipe_node1(name, id) == properties['node2']: return ChangeSet() if 'node1' in properties and 'node2' in properties: if properties['node1'] == properties['node2']: return ChangeSet() if 'status' in properties: if properties['status'] != PIPE_STATUS_OPEN and properties['status'] != PIPE_STATUS_CLOSED and properties['status'] != PIPE_STATUS_CV: return ChangeSet() old = Serialize(get_pipe(name, id), schema).to_storage() new = get_pipe(name, id) ps: list[str] = [] for key in properties: if key in schema and schema[key]['readonly'] == False: new[key] = properties[key] ps.append(key) new = Serialize(new, schema).to_execution() sql = f"update pipes set node1 = {new['node1']}, node2 = {new['node2']}, \ length = {new['length']}, diameter = {new['diameter']}, roughness = {new['roughness']}, minor_loss = {new['minor_loss']}, status = {new['status']} where id = '{id}';" undo = f"update pipes set node1 = {old['node1']}, node2 = {old['node2']}, \ length = {old['length']}, diameter = {old['diameter']}, roughness = {old['roughness']}, minor_loss = {old['minor_loss']}, status = {old['status']} where id = ''{id}'';" write(name, sql) add_operation(name, sql.replace("'", "''"), undo, 'set_pipe', API_UPDATE, PIPE, id, ps) return get_current_change_set(name) def delete_pipe(name: str, id: str) -> ChangeSet: row = get_pipe(name, id) if row == None: return ChangeSet() old = Serialize(get_pipe(name, id), schema).to_storage() sql = f"delete from pipes where id = '{id}';" sql += f"\ndelete from _link where id = '{id}';" undo = f"insert into _link (id, type) values (''{id}'', ''{PIPE}'');" undo += f"\ninsert into pipes (id, node1, node2, length, diameter, roughness, minor_loss, status) \ values (''{id}'', {old['node1']}, {old['node2']}, {old['length']}, {old['diameter']}, {old['roughness']}, {old['minor_loss']}, {old['status']});" write(name, sql) add_operation(name, sql.replace("'", "''"), undo, 'delete_pipe', API_DELETE, PIPE, id) return get_current_change_set(name) ''' def add_pipe(name: str, id: str, node1: str, node2: str, length: float = 0, diameter: float = 0, roughness: float = 0, minor_loss: float = 0, status: str = PIPE_STATUS_OPEN) -> ChangeSet: if not is_node(name, node1): return ChangeSet() if not is_node(name, node2): return ChangeSet() if node1 == node2: return ChangeSet() if status != PIPE_STATUS_OPEN and status != PIPE_STATUS_CLOSED and status != PIPE_STATUS_CV: return ChangeSet() sql = f"insert into pipes (id, node1, node2, length, diameter, roughness, minor_loss, status) values ('{id}', '{node1}', '{node2}', {length}, {diameter}, {roughness}, {minor_loss}, '{status}');" undo_sql = f'delete from pipes where id = "{id}";' return add_link(name, PIPE, id, sql, undo_sql) def _get_pipe(name: str, id: str) -> Row | None: return query(name, f"select node1, node2, length, diameter, roughness, minor_loss, status from pipes where id = '{id}'") def delete_pipe(name: str, id: str) -> ChangeSet: if not is_pipe(name, id): return ChangeSet() row = _get_pipe(name, id) if row == None: return ChangeSet() node1, node2, length, diameter, roughness, minor_loss, status = row['node1'], row['node2'], row['length'], row['diameter'], row['roughness'], row['minor_loss'], row['status'] sql = f"delete from pipes where id = '{id}';" undo_sql = f'insert into pipes (id, node1, node2, length, diameter, roughness, minor_loss, status) values ("{id}", "{node1}", "{node2}", {length}, {diameter}, {roughness}, {minor_loss}, "{status}");' return delete_link(name, PIPE, id, sql, undo_sql) def _get_pipe_node1(name: str, id: str) -> str | None: row = _get_pipe(name, id) return row['node1'] if row != None else None def _get_pipe_node2(name: str, id: str) -> str | None: row = _get_pipe(name, id) return row['node2'] if row != None else None def _set_pipe(name: str, id: str, key: str, key_type: str, value: str, optional: bool = False) -> ChangeSet: if not is_pipe(name, id): return ChangeSet() row = _get_pipe(name, id) if row == None: return ChangeSet() return update(name, PIPE, 'pipes', 'id', id, key, key_type, row[key], value, optional) def set_pipe_node1(name: str, id: str, node1: str) -> ChangeSet: if not is_node(name, node1): return ChangeSet() if _get_pipe_node2(name, id) == node1: return ChangeSet() return _set_pipe(name, id, 'node1', 'str', str(node1)) def set_pipe_node2(name: str, id: str, node2: str) -> ChangeSet: if not is_node(name, node2): return ChangeSet() if _get_pipe_node1(name, id) == node2: return ChangeSet() return _set_pipe(name, id, 'node2', 'str', str(node2)) def set_pipe_length(name: str, id: str, length: float) -> ChangeSet: return _set_pipe(name, id, 'length', 'float', str(length)) def set_pipe_diameter(name: str, id: str, diameter: float) -> ChangeSet: return _set_pipe(name, id, 'diameter', 'float', str(diameter)) def set_pipe_roughness(name: str, id: str, roughness: float) -> ChangeSet: return _set_pipe(name, id, 'roughness', 'float', str(roughness)) def set_pipe_minor_loss(name: str, id: str, minor_loss: float) -> ChangeSet: return _set_pipe(name, id, 'minor_loss', 'float', str(minor_loss)) def set_pipe_status(name: str, id: str, status: float) -> ChangeSet: if status != PIPE_STATUS_OPEN and status != PIPE_STATUS_CLOSED and status != PIPE_STATUS_CV: return ChangeSet() return _set_pipe(name, id, 'status', 'str', str(status)) def get_pipe_property_names(name: str) -> list[str]: return ['node1', 'node2', 'length', 'diameter', 'roughness', 'minor_loss', 'status'] def get_pipe_properties(name: str, id: str) -> dict[str, Any] | None: row = _get_pipe(name, id) if row == None: return None ps: dict[str, str] = {} ps['node1'] = row['node1'] if row != None else None ps['node2'] = row['node2'] if row != None else None ps['length'] = float(row['length']) if row != None else None ps['diameter'] = float(row['diameter']) if row != None else None ps['roughness'] = float(row['roughness']) if row != None else None ps['minor_loss'] = float(row['minor_loss']) if row != None else None ps['status'] = row['status'] if row != None else None return ps '''