from .database import * def get_scada_device_data_schema(name: str) -> dict[str, dict[str, Any]]: return { 'device_id' : {'type': 'str' , 'optional': False , 'readonly': True }, 'data' : {'type': 'list' , 'optional': False , 'readonly': False, 'element': { 'time' : {'type': 'str' , 'optional': False , 'readonly': False }, 'value' : {'type': 'float' , 'optional': False , 'readonly': False } }}} def get_scada_device_data(name: str, device_id: str) -> dict[str, Any]: sds = read_all(name, f"select * from scada_device_data where device_id = '{device_id}' order by time") ds = [] for r in sds: ds.append({ 'time': str(r['time']), 'value': float(r['value']) }) return { 'device_id': device_id, 'data': ds } def _set_scada_device_data(name: str, cs: ChangeSet) -> DbChangeSet: device_id = cs.operations[0]['device_id'] old = get_scada_device_data(name, device_id) new = { 'device_id': device_id, 'data': [] } f_device_id = f"'{device_id}'" # TODO: transaction ? redo_sql = f"delete from scada_device_data where device_id = {f_device_id};" for tv in cs.operations[0]['data']: time, value = str(tv['time']), float(tv['value']) f_time, f_value = f"'{time}'", value redo_sql += f"\ninsert into scada_device_data (device_id, time, value) values ({f_device_id}, {f_time}, {f_value});" new['data'].append({ 'time': time, 'value': value }) undo_sql = f"delete from scada_device_data where device_id = {f_device_id};" for tv in old['data']: time, value = str(tv['time']), float(tv['value']) f_time, f_value = f"'{time}'", value undo_sql += f"\ninsert into scada_device_data (device_id, time, value) values ({f_device_id}, {f_time}, {f_value});" redo_cs = g_update_prefix | { 'type': 'scada_device_data' } | new undo_cs = g_update_prefix | { 'type': 'scada_device_data' } | old return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def set_scada_device_data(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, _set_scada_device_data(name, cs), False) def _add_scada_device_data(name: str, cs: ChangeSet) -> DbChangeSet: values = cs.operations[0] device_id = values['device_id'] time = values['time'] value = float(values['value']) redo_sql = f"insert into scada_device_data (device_id, time, value) values ('{device_id}', '{time}', {value});" undo_sql = f"delete from scada_device_data where device_id = '{device_id}' and time = '{time}';" redo_cs = g_add_prefix | { 'type': 'scada_device_data', 'device_id': device_id, 'time': time, 'value': value } undo_cs = g_delete_prefix | { 'type': 'scada_device_data', 'device_id': device_id, 'time': time } return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def add_scada_device_data(name: str, cs: ChangeSet) -> ChangeSet: row = try_read(name, f"select * from scada_device_data where device_id = '{cs.operations[0]['device_id']}' and time = '{cs.operations[0]['time']}'") if row != None: return ChangeSet() return execute_command(name, _add_scada_device_data(name, cs), False) def _delete_scada_device_data(name: str, cs: ChangeSet) -> DbChangeSet: values = cs.operations[0] device_id = values['device_id'] time = values['time'] value = float(read(name, f"select * from scada_device_data where device_id = '{device_id}' and time = '{time}'")['value']) redo_sql = f"delete from scada_device_data where device_id = '{device_id}' and time = '{time}';" undo_sql = f"insert into scada_device_data (device_id, time, value) values ('{device_id}', '{time}', {value});" redo_cs = g_delete_prefix | { 'type': 'scada_device_data', 'device_id': device_id, 'time': time } undo_cs = g_add_prefix | { 'type': 'scada_device_data', 'device_id': device_id, 'time': time, 'value': value } return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def delete_scada_device_data(name: str, cs: ChangeSet) -> ChangeSet: row = try_read(name, f"select * from scada_device_data where device_id = '{cs.operations[0]['device_id']}' and time = '{cs.operations[0]['time']}'") if row == None: return ChangeSet() return execute_command(name, _delete_scada_device_data(name, cs), False)