from .database import * #-------------------------------------------------------------- # [EPANET2][IN][OUT] # UNITS CFS/GPM/MGD/IMGD/AFD/LPS/LPM/MLD/CMH/CMD/SI # PRESSURE PSI/KPA/M # HEADLOSS H-W/D-W/C-M # QUALITY NONE/AGE/TRACE/CHEMICAL (TraceNode) # UNBALANCED STOP/CONTINUE {Niter} # PATTERN id # DEMAND MODEL DDA/PDA # DEMAND MULTIPLIER value # EMITTER EXPONENT value # VISCOSITY value # DIFFUSIVITY value # SPECIFIC GRAVITY value # TRIALS value # ACCURACY value# # HEADERROR value # FLOWCHANGE value # MINIMUM PRESSURE value # REQUIRED PRESSURE value # PRESSURE EXPONENT value# # TOLERANCE value # HTOL value # QTOL value # RQTOL value # CHECKFREQ value # MAXCHECK value # DAMPLIMIT value # ---- Unsupported Options ----- # HYDRAULICS USE/SAVE filename # MAP filename #-------------------------------------------------------------- element_schema = {'type': 'str' , 'optional': True , 'readonly': False} OPTION_UNITS_CFS = 'CFS' OPTION_UNITS_GPM = 'GPM' OPTION_UNITS_MGD = 'MGD' OPTION_UNITS_IMGD = 'IMGD' OPTION_UNITS_AFD = 'AFD' OPTION_UNITS_LPS = 'LPS' OPTION_UNITS_LPM = 'LPM' OPTION_UNITS_MLD = 'MLD' OPTION_UNITS_CMH = 'CMH' OPTION_UNITS_CMD = 'CMD' OPTION_PRESSURE_PSI = 'PSI' OPTION_PRESSURE_KPA = 'KPA' OPTION_PRESSURE_METERS = 'METERS' OPTION_HEADLOSS_HW = 'H-W' OPTION_HEADLOSS_DW = 'D-W' OPTION_HEADLOSS_CM = 'C-M' OPTION_UNBALANCED_STOP = 'STOP' OPTION_UNBALANCED_CONTINUE = 'CONTINUE' OPTION_DEMAND_MODEL_DDA = 'DDA' OPTION_DEMAND_MODEL_PDA = 'PDA' OPTION_QUALITY_NONE = 'NONE' OPTION_QUALITY_CHEMICAL = 'CHEMICAL' OPTION_QUALITY_AGE = 'AGE' OPTION_QUALITY_TRACE = 'TRACE' def get_option_schema(name: str) -> dict[str, dict[str, Any]]: return { 'UNITS' : element_schema, 'PRESSURE' : element_schema, 'HEADLOSS' : element_schema, 'QUALITY' : element_schema, 'UNBALANCED' : element_schema, 'PATTERN' : element_schema, 'DEMAND MODEL' : element_schema, 'DEMAND MULTIPLIER' : element_schema, 'EMITTER EXPONENT' : element_schema, 'VISCOSITY' : element_schema, 'DIFFUSIVITY' : element_schema, 'SPECIFIC GRAVITY' : element_schema, 'TRIALS' : element_schema, 'ACCURACY' : element_schema, 'HEADERROR' : element_schema, 'FLOWCHANGE' : element_schema, 'MINIMUM PRESSURE' : element_schema, 'REQUIRED PRESSURE' : element_schema, 'PRESSURE EXPONENT' : element_schema, 'TOLERANCE' : element_schema, 'HTOL' : element_schema, 'QTOL' : element_schema, 'RQTOL' : element_schema, 'CHECKFREQ' : element_schema, 'MAXCHECK' : element_schema, 'DAMPLIMIT' : element_schema } def get_option(name: str) -> dict[str, Any]: ts = read_all(name, f"select * from options") d = {} for e in ts: d[e['key']] = str(e['value']) return d def _set_option(name: str, cs: ChangeSet) -> DbChangeSet: raw_old = get_option(name) old = {} new = {} new_dict = cs.operations[0] schema = get_option_schema(name) for key in schema.keys(): if key in new_dict: old[key] = str(raw_old[key]) new[key] = str(new_dict[key]) redo_cs = g_update_prefix | { 'type' : 'option' } redo_sql = '' for key, value in new.items(): if redo_sql != '': redo_sql += '\n' redo_sql += f"update options set value = '{value}' where key = '{key}';" redo_cs |= { key: value } undo_cs = g_update_prefix | { 'type' : 'option' } undo_sql = '' for key, value in old.items(): if undo_sql != '': undo_sql += '\n' undo_sql += f"update options set value = '{value}' where key = '{key}';" undo_cs |= { key: value } return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def set_option(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, _set_option(name, cs)) OPTION_V3_FLOW_UNITS_CFS = OPTION_UNITS_CFS OPTION_V3_FLOW_UNITS_GPM = OPTION_UNITS_GPM OPTION_V3_FLOW_UNITS_MGD = OPTION_UNITS_MGD OPTION_V3_FLOW_UNITS_IMGD = OPTION_UNITS_IMGD OPTION_V3_FLOW_UNITS_AFD = OPTION_UNITS_AFD OPTION_V3_FLOW_UNITS_LPS = OPTION_UNITS_LPS OPTION_V3_FLOW_UNITS_LPM = OPTION_UNITS_LPM OPTION_V3_FLOW_UNITS_MLD = OPTION_UNITS_MLD OPTION_V3_FLOW_UNITS_CMH = OPTION_UNITS_CMH OPTION_V3_FLOW_UNITS_CMD = OPTION_UNITS_CMD OPTION_V3_PRESSURE_UNITS_PSI = OPTION_PRESSURE_PSI OPTION_V3_PRESSURE_UNITS_KPA = OPTION_PRESSURE_KPA OPTION_V3_PRESSURE_UNITS_METERS = OPTION_PRESSURE_METERS OPTION_V3_HEADLOSS_MODEL_HW = OPTION_HEADLOSS_HW OPTION_V3_HEADLOSS_MODEL_DW = OPTION_HEADLOSS_DW OPTION_V3_HEADLOSS_MODEL_CM = OPTION_HEADLOSS_CM OPTION_V3_STEP_SIZING_FULL = 'FULL' OPTION_V3_STEP_SIZING_RELAXATION = 'RELAXATION' OPTION_V3_STEP_SIZING_LINESEARCH = 'LINESEARCH' OPTION_V3_IF_UNBALANCED_STOP = OPTION_UNBALANCED_STOP OPTION_V3_IF_UNBALANCED_CONTINUE = OPTION_UNBALANCED_CONTINUE OPTION_V3_DEMAND_MODEL_FIXED = 'FIXED' OPTION_V3_DEMAND_MODEL_CONSTRAINED = 'CONSTRAINED' OPTION_V3_DEMAND_MODEL_POWER = 'POWER' OPTION_V3_DEMAND_MODEL_LOGISTIC = 'LOGISTIC' OPTION_V3_LEAKAGE_MODEL_NONE = 'NONE' OPTION_V3_LEAKAGE_MODEL_POWER = 'POWER' OPTION_V3_LEAKAGE_MODEL_FAVAD = 'FAVAD' OPTION_V3_QUALITY_MODEL_NONE = OPTION_QUALITY_NONE OPTION_V3_QUALITY_MODEL_CHEMICAL = OPTION_QUALITY_CHEMICAL OPTION_V3_QUALITY_MODEL_AGE = OPTION_QUALITY_AGE OPTION_V3_QUALITY_MODEL_TRACE = OPTION_QUALITY_TRACE OPTION_V3_QUALITY_UNITS_HRS = 'HRS' OPTION_V3_QUALITY_UNITS_PCNT = 'PCNT' OPTION_V3_QUALITY_UNITS_MGL = 'MG/L' OPTION_V3_QUALITY_UNITS_UGL = 'UG/L' def get_option_v3_schema(name: str) -> dict[str, dict[str, Any]]: return { 'FLOW_UNITS' : element_schema, 'PRESSURE_UNITS' : element_schema, 'HEADLOSS_MODEL' : element_schema, 'SPECIFIC_GRAVITY' : element_schema, 'SPECIFIC_VISCOSITY' : element_schema, 'MAXIMUM_TRIALS' : element_schema, 'HEAD_TOLERANCE' : element_schema, 'FLOW_TOLERANCE' : element_schema, 'FLOW_CHANGE_LIMIT' : element_schema, 'RELATIVE_ACCURACY' : element_schema, 'TIME_WEIGHT' : element_schema, 'STEP_SIZING' : element_schema, 'IF_UNBALANCED' : element_schema, 'DEMAND_MODEL' : element_schema, 'DEMAND_PATTERN' : element_schema, 'DEMAND_MULTIPLIER' : element_schema, 'MINIMUM_PRESSURE' : element_schema, 'SERVICE_PRESSURE' : element_schema, 'PRESSURE_EXPONENT' : element_schema, 'LEAKAGE_MODEL' : element_schema, 'LEAKAGE_COEFF1' : element_schema, 'LEAKAGE_COEFF2' : element_schema, 'EMITTER_EXPONENT' : element_schema, 'QUALITY_MODEL' : element_schema, 'QUALITY_NAME' : element_schema, 'QUALITY_UNITS' : element_schema, 'TRACE_NODE' : element_schema, 'SPECIFIC_DIFFUSIVITY' : element_schema, 'QUALITY_TOLERANCE' : element_schema } def get_option_v3(name: str) -> dict[str, Any]: ts = read_all(name, f"select * from options_v3") d = {} for e in ts: d[e['key']] = str(e['value']) return d def _set_option_v3(name: str, cs: ChangeSet) -> DbChangeSet: raw_old = get_option_v3(name) old = {} new = {} new_dict = cs.operations[0] schema = get_option_v3_schema(name) for key in schema.keys(): if key in new_dict: old[key] = str(raw_old[key]) new[key] = str(new_dict[key]) redo_cs = g_update_prefix | { 'type' : 'option_v3' } redo_sql = '' for key, value in new.items(): if redo_sql != '': redo_sql += '\n' redo_sql += f"update options_v3 set value = '{value}' where key = '{key}';" redo_cs |= { key: value } undo_cs = g_update_prefix | { 'type' : 'option_v3' } undo_sql = '' for key, value in old.items(): if undo_sql != '': undo_sql += '\n' undo_sql += f"update options_v3 set value = '{value}' where key = '{key}';" undo_cs |= { key: value } return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def set_option_v3(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, _set_option_v3(name, cs)) _key_map_23 = { 'UNITS' : 'FLOW_UNITS', 'PRESSURE' : 'PRESSURE_UNITS', 'HEADLOSS' : 'HEADLOSS_MODEL', 'QUALITY' : 'QUALITY_MODEL', 'UNBALANCED' : 'IF_UNBALANCED', 'PATTERN' : 'DEMAND_PATTERN', 'DEMAND MODEL' : 'DEMAND_MODEL', 'DEMAND MULTIPLIER' : 'DEMAND_MULTIPLIER', 'EMITTER EXPONENT' : 'EMITTER_EXPONENT', 'VISCOSITY' : 'SPECIFIC_VISCOSITY', 'DIFFUSIVITY' : 'SPECIFIC_DIFFUSIVITY', 'SPECIFIC GRAVITY' : 'SPECIFIC_GRAVITY', 'TRIALS' : 'MAXIMUM_TRIALS', 'ACCURACY' : 'RELATIVE_ACCURACY', #'HEADERROR' : '', 'FLOWCHANGE' : 'FLOW_CHANGE_LIMIT', 'MINIMUM PRESSURE' : 'MINIMUM_PRESSURE', 'REQUIRED PRESSURE' : 'SERVICE_PRESSURE', 'PRESSURE EXPONENT' : 'PRESSURE_EXPONENT', 'TOLERANCE' : 'QUALITY_TOLERANCE', 'HTOL' : 'HEAD_TOLERANCE', 'QTOL' : 'FLOW_TOLERANCE', #'RQTOL' : '', #'CHECKFREQ' : '', #'MAXCHECK' : '', #'DAMPLIMIT' : '', } _key_map_32 = { 'FLOW_UNITS' : 'UNITS', 'PRESSURE_UNITS' : 'PRESSURE', 'HEADLOSS_MODEL' : 'HEADLOSS', 'SPECIFIC_GRAVITY' : 'SPECIFIC GRAVITY', 'SPECIFIC_VISCOSITY' : 'VISCOSITY', 'MAXIMUM_TRIALS' : 'TRIALS', 'HEAD_TOLERANCE' : 'HTOL', 'FLOW_TOLERANCE' : 'QTOL', 'FLOW_CHANGE_LIMIT' : 'FLOWCHANGE', 'RELATIVE_ACCURACY' : 'ACCURACY', #'TIME_WEIGHT' : '', #'STEP_SIZING' : '', 'IF_UNBALANCED' : 'UNBALANCED', 'DEMAND_MODEL' : 'DEMAND MODEL', 'DEMAND_PATTERN' : 'PATTERN', 'DEMAND_MULTIPLIER' : 'DEMAND MULTIPLIER', 'MINIMUM_PRESSURE' : 'MINIMUM PRESSURE', 'SERVICE_PRESSURE' : 'REQUIRED PRESSURE', 'PRESSURE_EXPONENT' : 'PRESSURE EXPONENT', #'LEAKAGE_MODEL' : '', #'LEAKAGE_COEFF1' : '', #'LEAKAGE_COEFF2' : '', 'EMITTER_EXPONENT' : 'EMITTER EXPONENT', 'QUALITY_MODEL' : 'QUALITY', #'QUALITY_NAME' : '', #'QUALITY_UNITS' : '', #'TRACE_NODE' : '', 'SPECIFIC_DIFFUSIVITY' : 'DIFFUSIVITY', 'QUALITY_TOLERANCE' : 'TOLERANCE' } def generate_v2(cs: ChangeSet) -> ChangeSet: op = cs.operations[0] if op['type'] == 'option': return cs map = _key_map_32 cs_v2 = {} for key in op: if key == 'operation' or key == 'type': continue if key in map.keys(): if key != 'QUALITY_MODEL' and key != 'DEMAND_MODEL': cs_v2 |= { map[key] : op[key] } elif key == 'QUALITY_MODEL': if str(op[key]).upper() == OPTION_QUALITY_TRACE and 'TRACE_NODE' in op.keys(): cs_v2 |= { map[key] : f"{OPTION_QUALITY_TRACE} {op['TRACE_NODE']}" } else: cs_v2 |= { map[key] : str(op[key]).upper() } elif key == 'DEMAND_MODEL': if op[key] == OPTION_V3_DEMAND_MODEL_FIXED: cs_v2 |= { map[key] : OPTION_DEMAND_MODEL_DDA } else: cs_v2 |= { map[key] : OPTION_DEMAND_MODEL_PDA } if len(cs_v2) > 0: cs_v2 |= g_update_prefix | { 'type' : 'option' } return ChangeSet(cs_v2) return ChangeSet() def generate_v3(cs: ChangeSet) -> ChangeSet: op = cs.operations[0] if op['type'] == 'option_v3': return cs map = _key_map_23 cs_v3 = {} for key in op: if key == 'operation' or key == 'type': continue if key in map.keys(): if key != 'QUALITY' and key != 'DEMAND MODEL': cs_v3 |= { map[key] : op[key] } elif key == 'QUALITY': tokens = str(op[key]).split() if len(tokens) >= 1: cs_v3 |= { map[key] : tokens[0].upper() } if tokens[0].upper() == OPTION_QUALITY_TRACE and len(tokens) >= 2: cs_v3 |= { 'TRACE_NODE' : tokens[1] } elif key == 'DEMAND MODEL': if op[key] == OPTION_DEMAND_MODEL_DDA: cs_v3 |= { map[key] : OPTION_V3_DEMAND_MODEL_FIXED } else: cs_v3 |= { map[key] : OPTION_V3_DEMAND_MODEL_POWER } if len(cs_v3) > 0: cs_v3 |= g_update_prefix | { 'type' : 'option_v3' } return ChangeSet(cs_v3) return ChangeSet()