from .operation import * OPTION_UNITS_CFS = 'CFS' OPTION_UNITS_GPM = 'GPM' OPTION_UNITS_MGD = 'MGD' OPTION_UNITS_IMGD = 'IMGD' OPTION_UNITS_AFD = 'AFD' OPTION_UNITS_LPS = 'LPS' OPTION_UNITS_LPM = 'LPM' OPTION_UNITS_MLD = 'MLD' OPTION_UNITS_CMH = 'CMH' OPTION_UNITS_CMD = 'CMD' OPTION_HEADLOSS_HW = 'H-W' OPTION_HEADLOSS_DW = 'D-W' OPTION_HEADLOSS_CM = 'C-M' #OPTION_HYDRAULICS_USE = 'USE' #OPTION_HYDRAULICS_SAVE = 'SAVE' OPTION_UNBALANCED_STOP = 'STOP' OPTION_UNBALANCED_CONTINUE = 'CONTINUE' OPTION_DEMAND_MODEL_DDA = 'DDA' OPTION_DEMAND_MODEL_PDA = 'PDA' OPTION_QUALITY_NONE = 'NONE' OPTION_QUALITY_CHEMICAL = 'CHEMICAL' OPTION_QUALITY_AGE = 'AGE' OPTION_QUALITY_TRACE = 'TRACE' element_schema = {'type': 'str' , 'optional': True , 'readonly': False} def get_option_schema(name: str) -> dict[str, dict[str, Any]]: return { 'UNITS' : element_schema, 'HEADLOSS' : element_schema, #'HYDRAULICS' : element_schema, 'VISCOSITY' : element_schema, 'SPECIFIC GRAVITY' : element_schema, 'TRIALS' : element_schema, 'ACCURACY' : element_schema, 'FLOWCHANGE' : element_schema, 'HEADERROR' : element_schema, 'CHECKFREQ' : element_schema, 'MAXCHECK' : element_schema, 'DAMPLIMIT' : element_schema, 'UNBALANCED' : element_schema, 'DEMAND MODEL' : element_schema, 'MINIMUM PRESSURE' : element_schema, 'REQUIRED PRESSURE' : element_schema, 'PRESSURE EXPONENT' : element_schema, 'PATTERN' : element_schema, 'DEMAND MULTIPLIER' : element_schema, 'EMITTER EXPONENT' : element_schema, 'QUALITY' : element_schema, 'DIFFUSIVITY' : element_schema, 'TOLERANCE' : element_schema, #'MAP' : element_schema, } def get_option(name: str) -> dict[str, Any]: ts = read_all(name, f"select * from options") d = {} for e in ts: d[e['key']] = str(e['value']) return d def set_option_cache(name: str, cs: ChangeSet) -> SqlChangeSet: raw_old = get_option(name) old = {} new = {} new_dict = cs.operations[0] schema = get_option_schema(name) for key in schema.keys(): if key in new_dict: old[key] = str(raw_old[key]) new[key] = str(new_dict[key]) redo_cs = g_update_prefix | { 'type' : 'option' } redo_sql = '' for key, value in new.items(): if redo_sql != '': redo_sql += '\n' redo_sql += f"update options set value = '{value}' where key = '{key}';" redo_cs |= { key: value } undo_cs = g_update_prefix | { 'type' : 'option' } undo_sql = '' for key, value in old.items(): if undo_sql != '': undo_sql += '\n' undo_sql += f"update options set value = '{value}' where key = '{key}';" undo_cs |= { key: value } return SqlChangeSet(redo_sql, undo_sql, redo_cs, undo_cs) def set_option(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, set_option_cache(name, cs)) def inp_in_option(section: list[str]) -> ChangeSet: cs = g_update_prefix | { 'type' : 'option' } for s in section: line = s.upper().strip() for key in get_option_schema('').keys(): if line.startswith(key): value = line.removeprefix(key).strip() cs |= { key : value } return ChangeSet(cs) def inp_out_option(name: str) -> list[str]: lines = [] objs = read_all(name, f"select * from options") for obj in objs: key = obj['key'] value = obj['value'] lines.append(f'{key} {value}') return lines