diff --git a/api/s21_times.py b/api/s21_times.py index 16910d9..44520d8 100644 --- a/api/s21_times.py +++ b/api/s21_times.py @@ -1,107 +1,112 @@ -from .database import * - -TIME_STATISTIC_NONE = 'NONE' -TIME_STATISTIC_AVERAGED = 'AVERAGED' -TIME_STATISTIC_MINIMUM = 'MINIMUM' -TIME_STATISTIC_MAXIMUM = 'MAXIMUM' -TIME_STATISTIC_RANGE = 'RANGE' - -element_schema = {'type': 'str' , 'optional': True , 'readonly': False} - -def get_time_schema(name: str) -> dict[str, dict[str, Any]]: - return { 'DURATION' : element_schema, - 'HYDRAULIC TIMESTEP' : element_schema, - 'QUALITY TIMESTEP' : element_schema, - 'RULE TIMESTEP' : element_schema, - 'PATTERN TIMESTEP' : element_schema, - 'PATTERN START' : element_schema, - 'REPORT TIMESTEP' : element_schema, - 'REPORT START' : element_schema, - 'START CLOCKTIME' : element_schema, - 'STATISTIC' : element_schema} - - -def get_time(name: str) -> dict[str, Any]: - ts = read_all(name, f"select * from times") - d = {} - for e in ts: - d[e['key']] = str(e['value']) - return d - - -def _set_time(name: str, cs: ChangeSet) -> DbChangeSet: - raw_old = get_time(name) - - old = {} - new = {} - - new_dict = cs.operations[0] - schema = get_time_schema(name) - for key in schema.keys(): - if key in new_dict: - old[key] = str(raw_old[key]) - new[key] = str(new_dict[key]) - - redo_cs = g_update_prefix | { 'type' : 'time' } - - redo_sql = '' - for key, value in new.items(): - if redo_sql != '': - redo_sql += '\n' - redo_sql += f"update times set value = '{value}' where key = '{key}';" - redo_cs |= { key: value } - - undo_cs = g_update_prefix | { 'type' : 'time' } - - undo_sql = '' - for key, value in old.items(): - if undo_sql != '': - undo_sql += '\n' - undo_sql += f"update times set value = '{value}' where key = '{key}';" - undo_cs |= { key: value } - - return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) - - -def set_time(name: str, cs: ChangeSet) -> ChangeSet: - return execute_command(name, _set_time(name, cs)) - - -#-------------------------------------------------------------- -# [EPA2][EPA3] -# STATISTIC {NONE/AVERAGE/MIN/MAX/RANGE} -# DURATION value (units) -# HYDRAULIC TIMESTEP value (units) -# QUALITY TIMESTEP value (units) -# RULE TIMESTEP value (units) -# PATTERN TIMESTEP value (units) -# PATTERN START value (units) -# REPORT TIMESTEP value (units) -# REPORT START value (units) -# START CLOCKTIME value (AM PM) -# [EPA3] supports [EPA2] keyword -#-------------------------------------------------------------- - - -def inp_in_time(section: list[str]) -> str: - sql = '' - for s in section: - if s.startswith(';'): - continue - - line = s.upper().strip() - for key in get_time_schema('').keys(): - if line.startswith(key): - value = line.removeprefix(key).strip() - sql += f"update times set value = '{value}' where key = '{key}';" - return sql - - -def inp_out_time(name: str) -> list[str]: - lines = [] - objs = read_all(name, f"select * from times") - for obj in objs: - key = obj['key'] - value = obj['value'] - lines.append(f'{key} {value}') - return lines +from .database import * + +TIME_STATISTIC_NONE = 'NONE' +TIME_STATISTIC_AVERAGED = 'AVERAGED' +TIME_STATISTIC_MINIMUM = 'MINIMUM' +TIME_STATISTIC_MAXIMUM = 'MAXIMUM' +TIME_STATISTIC_RANGE = 'RANGE' + +element_schema = {'type': 'str' , 'optional': True , 'readonly': False} + +def get_time_schema(name: str) -> dict[str, dict[str, Any]]: + return { 'DURATION' : element_schema, + 'HYDRAULIC TIMESTEP' : element_schema, + 'QUALITY TIMESTEP' : element_schema, + 'RULE TIMESTEP' : element_schema, + 'PATTERN TIMESTEP' : element_schema, + 'PATTERN START' : element_schema, + 'REPORT TIMESTEP' : element_schema, + 'REPORT START' : element_schema, + 'START CLOCKTIME' : element_schema, + 'STATISTIC' : element_schema} + + +def get_time(name: str) -> dict[str, Any]: + ts = read_all(name, f"select * from times") + d = {} + for e in ts: + d[e['key']] = str(e['value']) + return d + + +def _set_time(name: str, cs: ChangeSet) -> DbChangeSet: + raw_old = get_time(name) + + old = {} + new = {} + + new_dict = cs.operations[0] + schema = get_time_schema(name) + for key in schema.keys(): + if key in new_dict: + old[key] = str(raw_old[key]) + new[key] = str(new_dict[key]) + + redo_cs = g_update_prefix | { 'type' : 'time' } + + redo_sql = '' + for key, value in new.items(): + if redo_sql != '': + redo_sql += '\n' + redo_sql += f"update times set value = '{value}' where key = '{key}';" + redo_cs |= { key: value } + + undo_cs = g_update_prefix | { 'type' : 'time' } + + undo_sql = '' + for key, value in old.items(): + if undo_sql != '': + undo_sql += '\n' + undo_sql += f"update times set value = '{value}' where key = '{key}';" + undo_cs |= { key: value } + + return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) + + +def set_time(name: str, cs: ChangeSet) -> ChangeSet: + return execute_command(name, _set_time(name, cs)) + + +#-------------------------------------------------------------- +# [EPA2][EPA3] +# STATISTIC {NONE/AVERAGE/MIN/MAX/RANGE} +# DURATION value (units) +# HYDRAULIC TIMESTEP value (units) +# QUALITY TIMESTEP value (units) +# RULE TIMESTEP value (units) +# PATTERN TIMESTEP value (units) +# PATTERN START value (units) +# REPORT TIMESTEP value (units) +# REPORT START value (units) +# START CLOCKTIME value (AM PM) +# [EPA3] supports [EPA2] keyword +#-------------------------------------------------------------- + + +def inp_in_time(section: list[str]) -> str: + sql = '' + for s in section: + if s.startswith(';'): + continue + + line = s.upper().strip() + + # TOTAL DURATION => DURATION + if line.startswith('TOTAL DURATION'): + line = line.replace('TOTAL DURATION', 'DURATION') + + for key in get_time_schema('').keys(): + if line.startswith(key): + value = line.removeprefix(key).strip() + sql += f"update times set value = '{value}' where key = '{key}';" + return sql + + +def inp_out_time(name: str) -> list[str]: + lines = [] + objs = read_all(name, f"select * from times") + for obj in objs: + key = obj['key'] + value = obj['value'] + lines.append(f'{key} {value}') + return lines