Support "TOTAL DURATION"
This commit is contained in:
219
api/s21_times.py
219
api/s21_times.py
@@ -1,107 +1,112 @@
|
||||
from .database import *
|
||||
|
||||
TIME_STATISTIC_NONE = 'NONE'
|
||||
TIME_STATISTIC_AVERAGED = 'AVERAGED'
|
||||
TIME_STATISTIC_MINIMUM = 'MINIMUM'
|
||||
TIME_STATISTIC_MAXIMUM = 'MAXIMUM'
|
||||
TIME_STATISTIC_RANGE = 'RANGE'
|
||||
|
||||
element_schema = {'type': 'str' , 'optional': True , 'readonly': False}
|
||||
|
||||
def get_time_schema(name: str) -> dict[str, dict[str, Any]]:
|
||||
return { 'DURATION' : element_schema,
|
||||
'HYDRAULIC TIMESTEP' : element_schema,
|
||||
'QUALITY TIMESTEP' : element_schema,
|
||||
'RULE TIMESTEP' : element_schema,
|
||||
'PATTERN TIMESTEP' : element_schema,
|
||||
'PATTERN START' : element_schema,
|
||||
'REPORT TIMESTEP' : element_schema,
|
||||
'REPORT START' : element_schema,
|
||||
'START CLOCKTIME' : element_schema,
|
||||
'STATISTIC' : element_schema}
|
||||
|
||||
|
||||
def get_time(name: str) -> dict[str, Any]:
|
||||
ts = read_all(name, f"select * from times")
|
||||
d = {}
|
||||
for e in ts:
|
||||
d[e['key']] = str(e['value'])
|
||||
return d
|
||||
|
||||
|
||||
def _set_time(name: str, cs: ChangeSet) -> DbChangeSet:
|
||||
raw_old = get_time(name)
|
||||
|
||||
old = {}
|
||||
new = {}
|
||||
|
||||
new_dict = cs.operations[0]
|
||||
schema = get_time_schema(name)
|
||||
for key in schema.keys():
|
||||
if key in new_dict:
|
||||
old[key] = str(raw_old[key])
|
||||
new[key] = str(new_dict[key])
|
||||
|
||||
redo_cs = g_update_prefix | { 'type' : 'time' }
|
||||
|
||||
redo_sql = ''
|
||||
for key, value in new.items():
|
||||
if redo_sql != '':
|
||||
redo_sql += '\n'
|
||||
redo_sql += f"update times set value = '{value}' where key = '{key}';"
|
||||
redo_cs |= { key: value }
|
||||
|
||||
undo_cs = g_update_prefix | { 'type' : 'time' }
|
||||
|
||||
undo_sql = ''
|
||||
for key, value in old.items():
|
||||
if undo_sql != '':
|
||||
undo_sql += '\n'
|
||||
undo_sql += f"update times set value = '{value}' where key = '{key}';"
|
||||
undo_cs |= { key: value }
|
||||
|
||||
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
||||
|
||||
|
||||
def set_time(name: str, cs: ChangeSet) -> ChangeSet:
|
||||
return execute_command(name, _set_time(name, cs))
|
||||
|
||||
|
||||
#--------------------------------------------------------------
|
||||
# [EPA2][EPA3]
|
||||
# STATISTIC {NONE/AVERAGE/MIN/MAX/RANGE}
|
||||
# DURATION value (units)
|
||||
# HYDRAULIC TIMESTEP value (units)
|
||||
# QUALITY TIMESTEP value (units)
|
||||
# RULE TIMESTEP value (units)
|
||||
# PATTERN TIMESTEP value (units)
|
||||
# PATTERN START value (units)
|
||||
# REPORT TIMESTEP value (units)
|
||||
# REPORT START value (units)
|
||||
# START CLOCKTIME value (AM PM)
|
||||
# [EPA3] supports [EPA2] keyword
|
||||
#--------------------------------------------------------------
|
||||
|
||||
|
||||
def inp_in_time(section: list[str]) -> str:
|
||||
sql = ''
|
||||
for s in section:
|
||||
if s.startswith(';'):
|
||||
continue
|
||||
|
||||
line = s.upper().strip()
|
||||
for key in get_time_schema('').keys():
|
||||
if line.startswith(key):
|
||||
value = line.removeprefix(key).strip()
|
||||
sql += f"update times set value = '{value}' where key = '{key}';"
|
||||
return sql
|
||||
|
||||
|
||||
def inp_out_time(name: str) -> list[str]:
|
||||
lines = []
|
||||
objs = read_all(name, f"select * from times")
|
||||
for obj in objs:
|
||||
key = obj['key']
|
||||
value = obj['value']
|
||||
lines.append(f'{key} {value}')
|
||||
return lines
|
||||
from .database import *
|
||||
|
||||
TIME_STATISTIC_NONE = 'NONE'
|
||||
TIME_STATISTIC_AVERAGED = 'AVERAGED'
|
||||
TIME_STATISTIC_MINIMUM = 'MINIMUM'
|
||||
TIME_STATISTIC_MAXIMUM = 'MAXIMUM'
|
||||
TIME_STATISTIC_RANGE = 'RANGE'
|
||||
|
||||
element_schema = {'type': 'str' , 'optional': True , 'readonly': False}
|
||||
|
||||
def get_time_schema(name: str) -> dict[str, dict[str, Any]]:
|
||||
return { 'DURATION' : element_schema,
|
||||
'HYDRAULIC TIMESTEP' : element_schema,
|
||||
'QUALITY TIMESTEP' : element_schema,
|
||||
'RULE TIMESTEP' : element_schema,
|
||||
'PATTERN TIMESTEP' : element_schema,
|
||||
'PATTERN START' : element_schema,
|
||||
'REPORT TIMESTEP' : element_schema,
|
||||
'REPORT START' : element_schema,
|
||||
'START CLOCKTIME' : element_schema,
|
||||
'STATISTIC' : element_schema}
|
||||
|
||||
|
||||
def get_time(name: str) -> dict[str, Any]:
|
||||
ts = read_all(name, f"select * from times")
|
||||
d = {}
|
||||
for e in ts:
|
||||
d[e['key']] = str(e['value'])
|
||||
return d
|
||||
|
||||
|
||||
def _set_time(name: str, cs: ChangeSet) -> DbChangeSet:
|
||||
raw_old = get_time(name)
|
||||
|
||||
old = {}
|
||||
new = {}
|
||||
|
||||
new_dict = cs.operations[0]
|
||||
schema = get_time_schema(name)
|
||||
for key in schema.keys():
|
||||
if key in new_dict:
|
||||
old[key] = str(raw_old[key])
|
||||
new[key] = str(new_dict[key])
|
||||
|
||||
redo_cs = g_update_prefix | { 'type' : 'time' }
|
||||
|
||||
redo_sql = ''
|
||||
for key, value in new.items():
|
||||
if redo_sql != '':
|
||||
redo_sql += '\n'
|
||||
redo_sql += f"update times set value = '{value}' where key = '{key}';"
|
||||
redo_cs |= { key: value }
|
||||
|
||||
undo_cs = g_update_prefix | { 'type' : 'time' }
|
||||
|
||||
undo_sql = ''
|
||||
for key, value in old.items():
|
||||
if undo_sql != '':
|
||||
undo_sql += '\n'
|
||||
undo_sql += f"update times set value = '{value}' where key = '{key}';"
|
||||
undo_cs |= { key: value }
|
||||
|
||||
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
||||
|
||||
|
||||
def set_time(name: str, cs: ChangeSet) -> ChangeSet:
|
||||
return execute_command(name, _set_time(name, cs))
|
||||
|
||||
|
||||
#--------------------------------------------------------------
|
||||
# [EPA2][EPA3]
|
||||
# STATISTIC {NONE/AVERAGE/MIN/MAX/RANGE}
|
||||
# DURATION value (units)
|
||||
# HYDRAULIC TIMESTEP value (units)
|
||||
# QUALITY TIMESTEP value (units)
|
||||
# RULE TIMESTEP value (units)
|
||||
# PATTERN TIMESTEP value (units)
|
||||
# PATTERN START value (units)
|
||||
# REPORT TIMESTEP value (units)
|
||||
# REPORT START value (units)
|
||||
# START CLOCKTIME value (AM PM)
|
||||
# [EPA3] supports [EPA2] keyword
|
||||
#--------------------------------------------------------------
|
||||
|
||||
|
||||
def inp_in_time(section: list[str]) -> str:
|
||||
sql = ''
|
||||
for s in section:
|
||||
if s.startswith(';'):
|
||||
continue
|
||||
|
||||
line = s.upper().strip()
|
||||
|
||||
# TOTAL DURATION => DURATION
|
||||
if line.startswith('TOTAL DURATION'):
|
||||
line = line.replace('TOTAL DURATION', 'DURATION')
|
||||
|
||||
for key in get_time_schema('').keys():
|
||||
if line.startswith(key):
|
||||
value = line.removeprefix(key).strip()
|
||||
sql += f"update times set value = '{value}' where key = '{key}';"
|
||||
return sql
|
||||
|
||||
|
||||
def inp_out_time(name: str) -> list[str]:
|
||||
lines = []
|
||||
objs = read_all(name, f"select * from times")
|
||||
for obj in objs:
|
||||
key = obj['key']
|
||||
value = obj['value']
|
||||
lines.append(f'{key} {value}')
|
||||
return lines
|
||||
|
||||
Reference in New Issue
Block a user