from .operation import * def get_global_reaction_schema(name: str) -> dict[str, dict[str, Any]]: return { 'order_bulk' : {'type': 'float' , 'optional': True , 'readonly': False}, 'order_wall' : {'type': 'float' , 'optional': True , 'readonly': False}, 'order_tank' : {'type': 'float' , 'optional': True , 'readonly': False}, 'global_bulk' : {'type': 'float' , 'optional': True , 'readonly': False}, 'global_wall' : {'type': 'float' , 'optional': True , 'readonly': False}, 'limiting_potential' : {'type': 'float' , 'optional': True , 'readonly': False}, 'roughness_correlation' : {'type': 'float' , 'optional': True , 'readonly': False} } def get_global_reaction(name: str) -> dict[str, Any]: gr = read(name, f"select * from reactions_global") d = {} d['order_bulk'] = float(gr['order_bulk']) if gr['order_bulk'] != None else None d['order_wall'] = float(gr['order_wall']) if gr['order_wall'] != None else None d['order_tank'] = float(gr['order_tank']) if gr['order_tank'] != None else None d['global_bulk'] = float(gr['global_bulk']) if gr['global_bulk'] != None else None d['global_wall'] = float(gr['global_wall']) if gr['global_wall'] != None else None d['limiting_potential'] = float(gr['limiting_potential']) if gr['limiting_potential'] != None else None d['roughness_correlation'] = float(gr['roughness_correlation']) if gr['roughness_correlation'] != None else None return d class GlobalReaction(object): def __init__(self, input: dict[str, Any]) -> None: self.type = 'global_reaction' self.order_bulk = float(input['order_bulk']) if 'order_bulk' in input and input['order_bulk'] != None else None self.order_wall = float(input['order_wall']) if 'order_wall' in input and input['order_wall'] != None else None self.order_tank = float(input['order_tank']) if 'order_tank' in input and input['order_tank'] != None else None self.global_bulk = float(input['global_bulk']) if 'global_bulk' in input and input['global_bulk'] != None else None self.global_wall = float(input['global_wall']) if 'global_wall' in input and input['global_wall'] != None else None self.limiting_potential = float(input['limiting_potential']) if 'limiting_potential' in input and input['limiting_potential'] != None else None self.roughness_correlation = float(input['roughness_correlation']) if 'roughness_correlation' in input and input['roughness_correlation'] != None else None self.f_type = f"'{self.type}'" self.f_order_bulk = self.order_bulk if self.order_bulk != None else 'null' self.f_order_wall = self.order_wall if self.order_wall != None else 'null' self.f_order_tank = self.order_tank if self.order_tank != None else 'null' self.f_global_bulk = self.global_bulk if self.global_bulk != None else 'null' self.f_global_wall = self.global_wall if self.global_wall != None else 'null' self.f_limiting_potential = self.limiting_potential if self.limiting_potential != None else 'null' self.f_roughness_correlation = self.roughness_correlation if self.roughness_correlation != None else 'null' def as_dict(self) -> dict[str, Any]: return { 'type': self.type, 'order_bulk': self.order_bulk, 'order_wall': self.order_wall, 'order_tank': self.order_tank, 'global_bulk': self.global_bulk, 'global_wall': self.global_wall, 'limiting_potential': self.limiting_potential, 'roughness_correlation': self.roughness_correlation } def set_global_reaction_cache(name: str, cs: ChangeSet) -> SqlChangeSet: old = GlobalReaction(get_global_reaction(name)) raw_new = get_global_reaction(name) new_dict = cs.operations[0] schema = get_global_reaction_schema(name) for key, value in schema.items(): if key in new_dict and not value['readonly']: raw_new[key] = new_dict[key] new = GlobalReaction(raw_new) redo_sql = f"update reactions_global set order_bulk = {new.f_order_bulk}, order_wall = {new.f_order_wall}, order_tank = {new.f_order_tank}, global_bulk = {new.f_global_bulk}, global_wall = {new.f_global_wall}, limiting_potential = {new.f_limiting_potential}, roughness_correlation = {new.f_roughness_correlation} where _no = 0;" undo_sql = f"update reactions_global set order_bulk = {old.f_order_bulk}, order_wall = {old.f_order_wall}, order_tank = {old.f_order_tank}, global_bulk = {old.f_global_bulk}, global_wall = {old.f_global_wall}, limiting_potential = {old.f_limiting_potential}, roughness_correlation = {old.f_roughness_correlation} where _no = 0;" redo_cs = g_update_prefix | new.as_dict() undo_cs = g_update_prefix | old.as_dict() return SqlChangeSet(redo_sql, undo_sql, redo_cs, undo_cs) def set_global_reaction(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, set_global_reaction_cache(name, cs)) def get_pipe_reaction_schema(name: str) -> dict[str, dict[str, Any]]: return { 'pipe' : {'type': 'str' , 'optional': False , 'readonly': True }, 'bulk' : {'type': 'float' , 'optional': True , 'readonly': False}, 'wall' : {'type': 'float' , 'optional': True , 'readonly': False} } def get_pipe_reaction(name: str, pipe: str) -> dict[str, Any]: d = {} d['pipe'] = pipe pr = try_read(name, f"select * from reactions_pipe_bulk where pipe = '{pipe}'") d['bulk'] = float(pr['value']) if pr != None else None pr = try_read(name, f"select * from reactions_pipe_wall where pipe = '{pipe}'") d['wall'] = float(pr['value']) if pr != None else None return d class PipeReaction(object): def __init__(self, input: dict[str, Any]) -> None: self.type = 'pipe_reaction' self.pipe = str(input['pipe']) self.bulk = float(input['bulk']) if 'bulk' in input and input['bulk'] != None else None self.wall = float(input['wall']) if 'wall' in input and input['wall'] != None else None self.f_type = f"'{self.type}'" self.f_pipe = f"'{self.pipe}'" self.f_bulk = self.bulk if self.bulk != None else 'null' self.f_wall = self.wall if self.wall != None else 'null' def as_dict(self) -> dict[str, Any]: return { 'type': self.type, 'pipe': self.pipe, 'bulk': self.bulk, 'wall': self.wall } def set_pipe_reaction_cache(name: str, cs: ChangeSet) -> SqlChangeSet: old = PipeReaction(get_pipe_reaction(name, cs.operations[0]['pipe'])) raw_new = get_pipe_reaction(name, cs.operations[0]['pipe']) new_dict = cs.operations[0] schema = get_pipe_reaction_schema(name) for key, value in schema.items(): if key in new_dict and not value['readonly']: raw_new[key] = new_dict[key] new = PipeReaction(raw_new) redo_sql = f"delete from reactions_pipe_bulk where pipe = {new.f_pipe};\ndelete from reactions_pipe_wall where pipe = {new.f_pipe};" if new.bulk != None: redo_sql += f"\ninsert into reactions_pipe_bulk (pipe, value) values ({new.f_pipe}, {new.f_bulk});" if new.wall != None: redo_sql += f"\ninsert into reactions_pipe_wall (pipe, value) values ({new.f_pipe}, {new.f_wall});" undo_sql = f"delete from reactions_pipe_bulk where pipe = {old.f_pipe};\ndelete from reactions_pipe_wall where pipe = {old.f_pipe};" if old.bulk != None: undo_sql += f"\ninsert into reactions_pipe_bulk (pipe, value) values ({old.f_pipe}, {old.f_bulk});" if old.wall != None: undo_sql += f"\ninsert into reactions_pipe_wall (pipe, value) values ({old.f_pipe}, {old.f_wall});" redo_cs = g_update_prefix | new.as_dict() undo_cs = g_update_prefix | old.as_dict() return SqlChangeSet(redo_sql, undo_sql, redo_cs, undo_cs) def set_pipe_reaction(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, set_pipe_reaction_cache(name, cs)) def get_tank_reaction_schema(name: str) -> dict[str, dict[str, Any]]: return { 'tank' : {'type': 'str' , 'optional': False , 'readonly': True }, 'value' : {'type': 'float' , 'optional': True , 'readonly': False} } def get_tank_reaction(name: str, tank: str) -> dict[str, Any]: d = {} d['tank'] = tank pr = try_read(name, f"select * from reactions_tank where tank = '{tank}'") d['value'] = float(pr['value']) if pr != None else None return d class TankReaction(object): def __init__(self, input: dict[str, Any]) -> None: self.type = 'tank_reaction' self.tank = str(input['tank']) self.value = float(input['value']) if 'value' in input and input['value'] != None else None self.f_type = f"'{self.type}'" self.f_tank = f"'{self.tank}'" self.f_value = self.value if self.value != None else 'null' def as_dict(self) -> dict[str, Any]: return { 'type': self.type, 'tank': self.tank, 'value': self.value } def set_tank_reaction_cache(name: str, cs: ChangeSet) -> SqlChangeSet: old = TankReaction(get_tank_reaction(name, cs.operations[0]['tank'])) raw_new = get_tank_reaction(name, cs.operations[0]['tank']) new_dict = cs.operations[0] schema = get_tank_reaction_schema(name) for key, value in schema.items(): if key in new_dict and not value['readonly']: raw_new[key] = new_dict[key] new = TankReaction(raw_new) redo_sql = f"delete from reactions_tank where tank = {new.f_tank};" if new.value != None: redo_sql += f"\ninsert into reactions_tank (tank, value) values ({new.f_tank}, {new.f_value});" undo_sql = f"delete from reactions_tank where tank = {old.f_tank};" if old.value != None: undo_sql += f"\ninsert into reactions_tank (tank, value) values ({old.f_tank}, {old.f_value});" redo_cs = g_update_prefix | new.as_dict() undo_cs = g_update_prefix | old.as_dict() return SqlChangeSet(redo_sql, undo_sql, redo_cs, undo_cs) def set_tank_reaction(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, set_tank_reaction_cache(name, cs))