from typing import Any from psycopg.rows import Row from .s0_base import * from .change_set import ChangeSet from .s24_coordinates import * from .utility import * from .schema import * schema: dict[str, dict[str, Any]] = { \ 'id' : define_property(str_type, False, True), \ 'node1' : define_property(str_type), \ 'node2' : define_property(str_type)} def get_pump_schema(name: str) -> dict[str, dict[str, Any]]: return schema def _query_pump(name: str, id: str) -> Row | None: return read(name, f"select * from pumps where id = '{id}'") def _get_pump_node1(name: str, id: str) -> str | None: row = _query_pump(name, id) return row['node1'] if row != None else None def _get_pump_node2(name: str, id: str) -> str | None: row = _query_pump(name, id) return row['node2'] if row != None else None def add_pump(name: str, id: str, node1: str, node2: str) -> ChangeSet: if is_pump(name, id): return ChangeSet() if not is_node(name, node1): return ChangeSet() if not is_node(name, node2): return ChangeSet() if node1 == node2: return ChangeSet() sql = f"insert into _link (id, type) values ('{id}', '{PUMP}');" sql += f"\ninsert into pumps (id, node1, node2) values ('{id}', '{node1}', '{node2}');" undo = f"delete from pumps where id = ''{id}'';" undo += f"\ndelete from _link where id = ''{id}'';" write(name, sql) add_operation(name, sql.replace("'", "''"), undo, 'add_pump', API_ADD, PUMP, id) return get_current_change_set(name) def get_pump(name: str, id: str) -> dict[str, Any] | None: row = _query_pump(name, id) if row == None: return None ps: dict[str, str] = {} ps['id'] = id ps['node1'] = row['node1'] ps['node2'] = row['node2'] return ps def set_pump(name: str, id: str, properties: dict[str, Any]) -> ChangeSet: if not is_pump(name, id): return ChangeSet() if 'node1' in properties: if not is_node(name, properties['node1']) or _get_pump_node2(name, id) == properties['node1']: return ChangeSet() if 'node2' in properties: if not is_node(name, properties['node2']) or _get_pump_node1(name, id) == properties['node2']: return ChangeSet() if 'node1' in properties and 'node2' in properties: if properties['node1'] == properties['node2']: return ChangeSet() old = Serialize(get_pump(name, id), schema).to_storage() new = get_pump(name, id) ps: list[str] = [] for key in properties: if key in schema and schema[key]['readonly'] == False: new[key] = properties[key] ps.append(key) new = Serialize(new, schema).to_execution() sql = f"update pumps set node1 = {new['node1']}, node2 = {new['node2']} where id = '{id}';" undo = f"update pumps set node1 = {old['node1']}, node2 = {old['node2']} where id = ''{id}'';" write(name, sql) add_operation(name, sql.replace("'", "''"), undo, 'set_pump', API_UPDATE, PUMP, id, ps) return get_current_change_set(name) def delete_pump(name: str, id: str) -> ChangeSet: row = get_pump(name, id) if row == None: return ChangeSet() old = Serialize(get_pump(name, id), schema).to_storage() sql = f"delete from pumps where id = '{id}';" sql += f"\ndelete from _link where id = '{id}';" undo = f"insert into _link (id, type) values (''{id}'', ''{PUMP}'');" undo += f"\ninsert into pumps (id, node1, node2) values (''{id}'', {old['node1']}, {old['node2']});" write(name, sql) add_operation(name, sql.replace("'", "''"), undo, 'delete_pump', API_DELETE, PUMP, id) return get_current_change_set(name)