120 lines
4.2 KiB
Python
120 lines
4.2 KiB
Python
from .database import *
|
|
|
|
|
|
def get_emitter_schema(name: str) -> dict[str, dict[str, Any]]:
|
|
return { 'junction' : {'type': 'str' , 'optional': False , 'readonly': True },
|
|
'coefficient' : {'type': 'float' , 'optional': True , 'readonly': False} }
|
|
|
|
|
|
def get_emitter(name: str, junction: str) -> dict[str, Any]:
|
|
e = try_read(name, f"select * from emitters where junction = '{junction}'")
|
|
if e == None:
|
|
return { 'junction': junction, 'coefficient': None }
|
|
d = {}
|
|
d['junction'] = str(e['junction'])
|
|
d['coefficient'] = float(e['coefficient']) if e['coefficient'] != None else None
|
|
return d
|
|
|
|
|
|
class Emitter(object):
|
|
def __init__(self, input: dict[str, Any]) -> None:
|
|
self.type = 'emitter'
|
|
self.junction = str(input['junction'])
|
|
self.coefficient = float(input['coefficient']) if 'coefficient' in input and input['coefficient'] != None else None
|
|
|
|
self.f_type = f"'{self.type}'"
|
|
self.f_junction = f"'{self.junction}'"
|
|
self.f_coefficient = self.coefficient if self.coefficient != None else 'null'
|
|
|
|
def as_dict(self) -> dict[str, Any]:
|
|
return { 'type': self.type, 'junction': self.junction, 'coefficient': self.coefficient }
|
|
|
|
|
|
def set_emitter_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
old = Emitter(get_emitter(name, cs.operations[0]['junction']))
|
|
raw_new = get_emitter(name, cs.operations[0]['junction'])
|
|
|
|
new_dict = cs.operations[0]
|
|
schema = get_emitter_schema(name)
|
|
for key, value in schema.items():
|
|
if key in new_dict and not value['readonly']:
|
|
raw_new[key] = new_dict[key]
|
|
new = Emitter(raw_new)
|
|
|
|
redo_sql = f"delete from emitters where junction = {new.f_junction};"
|
|
if new.coefficient != None:
|
|
redo_sql += f"\ninsert into emitters (junction, coefficient) values ({new.f_junction}, {new.f_coefficient});"
|
|
|
|
undo_sql = f"delete from emitters where junction = {old.f_junction};"
|
|
if old.coefficient != None:
|
|
undo_sql += f"\ninsert into emitters (junction, coefficient) values ({old.f_junction}, {old.f_coefficient});"
|
|
|
|
redo_cs = g_update_prefix | new.as_dict()
|
|
undo_cs = g_update_prefix | old.as_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def set_emitter(name: str, cs: ChangeSet) -> ChangeSet:
|
|
return execute_command(name, set_emitter_cmd(name, cs))
|
|
|
|
|
|
#--------------------------------------------------------------
|
|
# [EPA2][IN][OUT]
|
|
# node Ke
|
|
#--------------------------------------------------------------
|
|
# [EPA3][IN][OUT]
|
|
# node Ke (exponent pattern)
|
|
#--------------------------------------------------------------
|
|
class InpEmitter:
|
|
def __init__(self, line: str) -> None:
|
|
tokens = line.split()
|
|
|
|
num = len(tokens)
|
|
has_desc = tokens[-1].startswith(';')
|
|
num_without_desc = (num - 1) if has_desc else num
|
|
|
|
self.junction = str(tokens[0])
|
|
self.coefficient = float(tokens[1])
|
|
|
|
|
|
def inp_in_emitter(section: list[str]) -> ChangeSet:
|
|
cs = ChangeSet()
|
|
for s in section:
|
|
# skip comment
|
|
if s.startswith(';'):
|
|
continue
|
|
obj = InpEmitter(s)
|
|
cs.append(g_update_prefix | {'type': 'emitter', 'junction': obj.junction, 'coefficient': obj.coefficient})
|
|
return cs
|
|
|
|
|
|
def inp_in_emitter_new(name: str, line: str) -> None:
|
|
tokens = line.split()
|
|
|
|
num = len(tokens)
|
|
has_desc = tokens[-1].startswith(';')
|
|
num_without_desc = (num - 1) if has_desc else num
|
|
|
|
junction = str(tokens[0])
|
|
coefficient = float(tokens[1])
|
|
|
|
write(name, f"\ninsert into emitters (junction, coefficient) values ('{junction}', {coefficient});")
|
|
|
|
|
|
def inp_out_emitter(name: str) -> list[str]:
|
|
lines = []
|
|
objs = read_all(name, 'select * from emitters')
|
|
for obj in objs:
|
|
junction = obj['junction']
|
|
coefficient = obj['coefficient']
|
|
lines.append(f'{junction} {coefficient}')
|
|
return lines
|
|
|
|
|
|
def delete_emitter_by_junction(name: str, junction: str) -> ChangeSet:
|
|
row = try_read(name, f"select * from emitters where junction = '{junction}'")
|
|
if row == None:
|
|
return ChangeSet()
|
|
return ChangeSet(g_update_prefix | {'type' : 'emitter', 'junction': junction, 'coefficient': None})
|