113 lines
3.6 KiB
Python
113 lines
3.6 KiB
Python
from .database import *
|
|
|
|
TIME_STATISTIC_NONE = 'NONE'
|
|
TIME_STATISTIC_AVERAGED = 'AVERAGED'
|
|
TIME_STATISTIC_MINIMUM = 'MINIMUM'
|
|
TIME_STATISTIC_MAXIMUM = 'MAXIMUM'
|
|
TIME_STATISTIC_RANGE = 'RANGE'
|
|
|
|
element_schema = {'type': 'str' , 'optional': True , 'readonly': False}
|
|
|
|
def get_time_schema(name: str) -> dict[str, dict[str, Any]]:
|
|
return { 'DURATION' : element_schema,
|
|
'HYDRAULIC TIMESTEP' : element_schema,
|
|
'QUALITY TIMESTEP' : element_schema,
|
|
'RULE TIMESTEP' : element_schema,
|
|
'PATTERN TIMESTEP' : element_schema,
|
|
'PATTERN START' : element_schema,
|
|
'REPORT TIMESTEP' : element_schema,
|
|
'REPORT START' : element_schema,
|
|
'START CLOCKTIME' : element_schema,
|
|
'STATISTIC' : element_schema}
|
|
|
|
|
|
def get_time(name: str) -> dict[str, Any]:
|
|
ts = read_all(name, f"select * from times")
|
|
d = {}
|
|
for e in ts:
|
|
d[e['key']] = str(e['value'])
|
|
return d
|
|
|
|
|
|
def _set_time(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
raw_old = get_time(name)
|
|
|
|
old = {}
|
|
new = {}
|
|
|
|
new_dict = cs.operations[0]
|
|
schema = get_time_schema(name)
|
|
for key in schema.keys():
|
|
if key in new_dict:
|
|
old[key] = str(raw_old[key])
|
|
new[key] = str(new_dict[key])
|
|
|
|
redo_cs = g_update_prefix | { 'type' : 'time' }
|
|
|
|
redo_sql = ''
|
|
for key, value in new.items():
|
|
if redo_sql != '':
|
|
redo_sql += '\n'
|
|
redo_sql += f"update times set value = '{value}' where key = '{key}';"
|
|
redo_cs |= { key: value }
|
|
|
|
undo_cs = g_update_prefix | { 'type' : 'time' }
|
|
|
|
undo_sql = ''
|
|
for key, value in old.items():
|
|
if undo_sql != '':
|
|
undo_sql += '\n'
|
|
undo_sql += f"update times set value = '{value}' where key = '{key}';"
|
|
undo_cs |= { key: value }
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def set_time(name: str, cs: ChangeSet) -> ChangeSet:
|
|
return execute_command(name, _set_time(name, cs))
|
|
|
|
|
|
#--------------------------------------------------------------
|
|
# [EPA2][EPA3]
|
|
# STATISTIC {NONE/AVERAGE/MIN/MAX/RANGE}
|
|
# DURATION value (units)
|
|
# HYDRAULIC TIMESTEP value (units)
|
|
# QUALITY TIMESTEP value (units)
|
|
# RULE TIMESTEP value (units)
|
|
# PATTERN TIMESTEP value (units)
|
|
# PATTERN START value (units)
|
|
# REPORT TIMESTEP value (units)
|
|
# REPORT START value (units)
|
|
# START CLOCKTIME value (AM PM)
|
|
# [EPA3] supports [EPA2] keyword
|
|
#--------------------------------------------------------------
|
|
|
|
|
|
def inp_in_time(section: list[str]) -> str:
|
|
sql = ''
|
|
for s in section:
|
|
if s.startswith(';'):
|
|
continue
|
|
|
|
line = s.upper().strip()
|
|
|
|
# TOTAL DURATION => DURATION
|
|
if line.startswith('TOTAL DURATION'):
|
|
line = line.replace('TOTAL DURATION', 'DURATION')
|
|
|
|
for key in get_time_schema('').keys():
|
|
if line.startswith(key):
|
|
value = line.removeprefix(key).strip()
|
|
sql += f"update times set value = '{value}' where key = '{key}';"
|
|
return sql
|
|
|
|
|
|
def inp_out_time(name: str) -> list[str]:
|
|
lines = []
|
|
objs = read_all(name, f"select * from times")
|
|
for obj in objs:
|
|
key = obj['key']
|
|
value = obj['value']
|
|
lines.append(f'{key} {value}')
|
|
return lines
|