from .database import * def get_demand_schema(name: str) -> dict[str, dict[str, Any]]: return { 'junction' : {'type': 'str' , 'optional': False , 'readonly': True }, 'demands' : {'type': 'list' , 'optional': False , 'readonly': False, 'element': { 'demand' : {'type': 'float' , 'optional': False , 'readonly': False }, 'pattern' : {'type': 'str' , 'optional': True , 'readonly': False }, 'category': {'type': 'str' , 'optional': True , 'readonly': False }}}} def get_demand(name: str, junction: str) -> dict[str, Any]: des = read_all(name, f"select * from demands where junction = '{junction}' order by _order") ds = [] for r in des: d = {} d['demand'] = float(r['demand']) d['pattern'] = str(r['pattern']) if r['pattern'] != None else None d['category'] = str(r['category']) if r['category'] != None else None ds.append(d) return { 'junction': junction, 'demands': ds } def _set_demand(name: str, cs: ChangeSet) -> DbChangeSet: junction = cs.operations[0]['junction'] old = get_demand(name, junction) new = { 'junction': junction, 'demands': [] } f_junction = f"'{junction}'" # TODO: transaction ? redo_sql = f"delete from demands where junction = {f_junction};" for r in cs.operations[0]['demands']: demand = float(r['demand']) pattern = str(r['pattern']) if 'pattern' in r and r['pattern'] != None else None category = str(r['category']) if 'category' in r and r['category'] != None else None f_demand = demand f_pattern = f"'{pattern}'" if pattern != None else 'null' f_category = f"'{category}'" if category != None else 'null' redo_sql += f"\ninsert into demands (junction, demand, pattern, category) values ({f_junction}, {f_demand}, {f_pattern}, {f_category});" new['demands'].append({ 'demand': demand, 'pattern': pattern, 'category': category }) undo_sql = f"delete from demands where junction = {f_junction};" for r in old['demands']: demand = float(r['demand']) pattern = str(r['pattern']) if 'pattern' in r and r['pattern'] != None else None category = str(r['category']) if 'category' in r and r['category'] != None else None f_demand = demand f_pattern = f"'{pattern}'" if pattern != None else 'null' f_category = f"'{category}'" if category != None else 'null' undo_sql += f"\ninsert into demands (junction, demand, pattern, category) values ({f_junction}, {f_demand}, {f_pattern}, {f_category});" redo_cs = g_update_prefix | { 'type': 'demand' } | new undo_cs = g_update_prefix | { 'type': 'demand' } | old return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def set_demand(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, _set_demand(name, cs)) #-------------------------------------------------------------- # [EPA2][EPA3][IN][OUT] # node base_demand (pattern) ;category #-------------------------------------------------------------- def inp_in_demand(line: str) -> str: tokens = line.split() num = len(tokens) has_desc = tokens[-1].startswith(';') num_without_desc = (num - 1) if has_desc else num junction = str(tokens[0]) demand = float(tokens[1]) pattern = str(tokens[2]) if num_without_desc >= 3 else None pattern = f"'{pattern}'" if pattern != None else 'null' category = str(tokens[3]) if num_without_desc >= 4 else None category = f"'{category}'" if category != None else 'null' return f"insert into demands (junction, demand, pattern, category) values ('{junction}', {demand}, {pattern}, {category});" def inp_out_demand(name: str) -> list[str]: lines = [] objs = read_all(name, f"select * from demands order by _order") for obj in objs: junction = obj['junction'] demand = obj['demand'] pattern = obj['pattern'] if obj['pattern'] != None else '' category = f";{obj['category']}" if obj['category'] != None else ';' lines.append(f'{junction} {demand} {pattern} {category}') return lines def delete_demand_by_junction(name: str, junction: str) -> ChangeSet: row = try_read(name, f"select * from demands where junction = '{junction}'") if row == None: return ChangeSet() return ChangeSet(g_update_prefix | {'type': 'demand', 'junction': junction, 'demands': []}) def unset_demand_by_pattern(name: str, pattern: str) -> ChangeSet: cs = ChangeSet() rows = read_all(name, f"select distinct junction from demands where pattern = '{pattern}'") for row in rows: ds = get_demand(name, row['junction']) for d in ds['demands']: d['pattern'] = None cs.append(g_update_prefix | {'type': 'demand', 'junction': row['junction'], 'demands': ds['demands']}) return cs