196 lines
7.1 KiB
Python
196 lines
7.1 KiB
Python
from .database import *
|
|
from .s23_options import OPTION_UNITS_CFS, OPTION_UNITS_GPM, OPTION_UNITS_MGD, OPTION_UNITS_IMGD, OPTION_UNITS_AFD, OPTION_UNITS_LPS, OPTION_UNITS_LPM, OPTION_UNITS_MLD, OPTION_UNITS_CMH, OPTION_UNITS_CMD
|
|
from .s23_options import OPTION_PRESSURE_PSI, OPTION_PRESSURE_KPA, OPTION_PRESSURE_M
|
|
from .s23_options import OPTION_HEADLOSS_HW, OPTION_HEADLOSS_DW, OPTION_HEADLOSS_CM
|
|
from .s23_options import OPTION_UNBALANCED_STOP, OPTION_UNBALANCED_CONTINUE
|
|
from .s23_options import OPTION_QUALITY_NONE, OPTION_QUALITY_CHEMICAL, OPTION_QUALITY_AGE, OPTION_QUALITY_TRACE
|
|
from .s23_options import element_schema
|
|
from .s23_options import get_option_schema, set_option
|
|
from .s23_options_util import generate_v2, generate_v3
|
|
|
|
OPTION_V3_FLOW_UNITS_CFS = OPTION_UNITS_CFS
|
|
OPTION_V3_FLOW_UNITS_GPM = OPTION_UNITS_GPM
|
|
OPTION_V3_FLOW_UNITS_MGD = OPTION_UNITS_MGD
|
|
OPTION_V3_FLOW_UNITS_IMGD = OPTION_UNITS_IMGD
|
|
OPTION_V3_FLOW_UNITS_AFD = OPTION_UNITS_AFD
|
|
OPTION_V3_FLOW_UNITS_LPS = OPTION_UNITS_LPS
|
|
OPTION_V3_FLOW_UNITS_LPM = OPTION_UNITS_LPM
|
|
OPTION_V3_FLOW_UNITS_MLD = OPTION_UNITS_MLD
|
|
OPTION_V3_FLOW_UNITS_CMH = OPTION_UNITS_CMH
|
|
OPTION_V3_FLOW_UNITS_CMD = OPTION_UNITS_CMD
|
|
|
|
OPTION_V3_PRESSURE_UNITS_PSI = OPTION_PRESSURE_PSI
|
|
OPTION_V3_PRESSURE_UNITS_KPA = OPTION_PRESSURE_KPA
|
|
OPTION_V3_PRESSURE_UNITS_M = OPTION_PRESSURE_M
|
|
|
|
OPTION_V3_HEADLOSS_MODEL_HW = OPTION_HEADLOSS_HW
|
|
OPTION_V3_HEADLOSS_MODEL_DW = OPTION_HEADLOSS_DW
|
|
OPTION_V3_HEADLOSS_MODEL_CM = OPTION_HEADLOSS_CM
|
|
|
|
OPTION_V3_STEP_SIZING = 'FULL'
|
|
OPTION_V3_STEP_SIZING = 'RELAXATION'
|
|
OPTION_V3_STEP_SIZING = 'LINESEARCH'
|
|
|
|
OPTION_V3_IF_UNBALANCED_STOP = OPTION_UNBALANCED_STOP
|
|
OPTION_V3_IF_UNBALANCED_CONTINUE = OPTION_UNBALANCED_CONTINUE
|
|
|
|
OPTION_V3_DEMAND_MODEL_FIXED = 'FIXED'
|
|
OPTION_V3_DEMAND_MODEL_CONSTRAINED = 'CONSTRAINED'
|
|
OPTION_V3_DEMAND_MODEL_POWER = 'POWER'
|
|
OPTION_V3_DEMAND_MODEL_LOGISTIC = 'LOGISTIC'
|
|
|
|
OPTION_V3_LEAKAGE_MODEL_NONE = 'NONE'
|
|
OPTION_V3_LEAKAGE_MODEL_POWER = 'POWER'
|
|
OPTION_V3_LEAKAGE_MODEL_FAVAD = 'FAVAD'
|
|
|
|
OPTION_V3_QUALITY_MODEL_NONE = OPTION_QUALITY_NONE
|
|
OPTION_V3_QUALITY_MODEL_CHEMICAL = OPTION_QUALITY_CHEMICAL
|
|
OPTION_V3_QUALITY_MODEL_AGE = OPTION_QUALITY_AGE
|
|
OPTION_V3_QUALITY_MODEL_TRACE = OPTION_QUALITY_TRACE
|
|
|
|
OPTION_V3_QUALITY_UNITS_HRS = 'HRS'
|
|
OPTION_V3_QUALITY_UNITS_PCNT = 'PCNT'
|
|
OPTION_V3_QUALITY_UNITS_MGL = 'MG/L'
|
|
OPTION_V3_QUALITY_UNITS_UGL = 'UG/L'
|
|
|
|
|
|
def get_option_v3_schema(name: str) -> dict[str, dict[str, Any]]:
|
|
return { 'FLOW_UNITS' : element_schema,
|
|
'PRESSURE_UNITS' : element_schema,
|
|
'HEADLOSS_MODEL' : element_schema,
|
|
'SPECIFIC_GRAVITY' : element_schema,
|
|
'SPECIFIC_VISCOSITY' : element_schema,
|
|
'MAXIMUM_TRIALS' : element_schema,
|
|
'HEAD_TOLERANCE' : element_schema,
|
|
'FLOW_TOLERANCE' : element_schema,
|
|
'FLOW_CHANGE_LIMIT' : element_schema,
|
|
'RELATIVE_ACCURACY' : element_schema,
|
|
'TIME_WEIGHT' : element_schema,
|
|
'STEP_SIZING' : element_schema,
|
|
'IF_UNBALANCED' : element_schema,
|
|
'DEMAND_MODEL' : element_schema,
|
|
'DEMAND_PATTERN' : element_schema,
|
|
'DEMAND_MULTIPLIER' : element_schema,
|
|
'MINIMUM_PRESSURE' : element_schema,
|
|
'SERVICE_PRESSURE' : element_schema,
|
|
'PRESSURE_EXPONENT' : element_schema,
|
|
'LEAKAGE_MODEL' : element_schema,
|
|
'LEAKAGE_COEFF1' : element_schema,
|
|
'LEAKAGE_COEFF2' : element_schema,
|
|
'EMITTER_EXPONENT' : element_schema,
|
|
'QUALITY_MODEL' : element_schema,
|
|
'QUALITY_NAME' : element_schema,
|
|
'QUALITY_UNITS' : element_schema,
|
|
'TRACE_NODE' : element_schema,
|
|
'SPECIFIC_DIFFUSIVITY' : element_schema,
|
|
'QUALITY_TOLERANCE' : element_schema }
|
|
|
|
|
|
def get_option_v3(name: str) -> dict[str, Any]:
|
|
ts = read_all(name, f"select * from options_v3")
|
|
d = {}
|
|
for e in ts:
|
|
d[e['key']] = str(e['value'])
|
|
return d
|
|
|
|
|
|
def set_option_v3_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
raw_old = get_option_v3(name)
|
|
|
|
old = {}
|
|
new = {}
|
|
|
|
new_dict = cs.operations[0]
|
|
schema = get_option_v3_schema(name)
|
|
for key in schema.keys():
|
|
if key in new_dict:
|
|
old[key] = str(raw_old[key])
|
|
new[key] = str(new_dict[key])
|
|
|
|
redo_cs = g_update_prefix | { 'type' : 'option_v3' }
|
|
|
|
redo_sql = ''
|
|
for key, value in new.items():
|
|
if redo_sql != '':
|
|
redo_sql += '\n'
|
|
redo_sql += f"update options_v3 set value = '{value}' where key = '{key}';"
|
|
redo_cs |= { key: value }
|
|
|
|
undo_cs = g_update_prefix | { 'type' : 'option_v3' }
|
|
|
|
undo_sql = ''
|
|
for key, value in old.items():
|
|
if undo_sql != '':
|
|
undo_sql += '\n'
|
|
undo_sql += f"update options_v3 set value = '{value}' where key = '{key}';"
|
|
undo_cs |= { key: value }
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def set_option_v3(name: str, cs: ChangeSet, update_v2: bool = True) -> ChangeSet:
|
|
v3 = set_option_v3_cmd(name, cs)
|
|
result = execute_command(name, v3)
|
|
if update_v2:
|
|
v2 = generate_v2(ChangeSet(v3.redo_cs[0]))
|
|
result.merge(set_option(name, v2, False))
|
|
return result
|
|
|
|
|
|
def _parse_v2(v2_lines: list[str]) -> dict[str, str]:
|
|
cs_v2 = g_update_prefix | { 'type' : 'option' }
|
|
for s in v2_lines:
|
|
tokens = s.split()
|
|
if tokens[0].upper() == 'PATTERN': # can not upper id
|
|
cs_v2 |= { 'PATTERN' : tokens[1] }
|
|
elif tokens[0].upper() == 'QUALITY': # can not upper trace node
|
|
value = tokens[1]
|
|
if len(tokens) > 2:
|
|
value += f' {tokens[2]}'
|
|
cs_v2 |= { 'QUALITY' : value }
|
|
else:
|
|
line = s.upper().strip()
|
|
for key in get_option_schema('').keys():
|
|
if line.startswith(key):
|
|
value = line.removeprefix(key).strip()
|
|
cs_v2 |= { key : value }
|
|
return cs_v2
|
|
|
|
|
|
def inp_in_option_v3(section: list[str]) -> ChangeSet:
|
|
if len(section) <= 0:
|
|
return ChangeSet()
|
|
|
|
cs_v3 = g_update_prefix | { 'type' : 'option_v3' }
|
|
v2_lines = []
|
|
for s in section:
|
|
if s.startswith(';'):
|
|
continue
|
|
|
|
tokens = s.strip().split()
|
|
key = tokens[0]
|
|
if key in get_option_v3_schema('').keys():
|
|
value = tokens[1] if len(tokens) >= 2 else ''
|
|
cs_v3 |= { key : value }
|
|
else:
|
|
v2_lines.append(s.strip())
|
|
|
|
# unlikely...
|
|
cs_v2 = _parse_v2(v2_lines)
|
|
|
|
result = ChangeSet(cs_v3)
|
|
result.merge(generate_v3(ChangeSet(cs_v2)))
|
|
result.merge(generate_v2(result))
|
|
return result
|
|
|
|
|
|
def inp_out_option_v3(name: str) -> list[str]:
|
|
lines = []
|
|
objs = read_all(name, f"select * from options_v3")
|
|
for obj in objs:
|
|
key = obj['key']
|
|
value = obj['value']
|
|
if value != '':
|
|
lines.append(f'{key} {value}')
|
|
return lines
|