diff --git a/requirements.txt b/requirements.txt index 0f77826..1db579a 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/run_server.py b/run_server.py index 0e0d57e..958ce34 100644 --- a/run_server.py +++ b/run_server.py @@ -8,20 +8,11 @@ if __name__ == "__main__": if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) - # 创建配置对象 - config = uvicorn.Config( + # 用 uvicorn.run 支持 workers 参数 + uvicorn.run( "main:app", host="0.0.0.0", port=8000, - loop="asyncio", # 强制使用 asyncio 事件循环 - workers=2, # 如果需要多进程,可以取消注释 + workers=2, # 这里可以设置多进程 + loop="asyncio", ) - server = uvicorn.Server(config) - - # 创建并设置事件循环 - if sys.platform == "win32": - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - # 运行服务器 - asyncio.run(server.serve()) diff --git a/simulation.py b/simulation.py index 374e322..da7f4bf 100644 --- a/simulation.py +++ b/simulation.py @@ -1197,7 +1197,7 @@ def run_simulation( cs.append(valve_status) set_status(name_c, cs) # 运行并返回结果 - result = run_project(name_c) + run_project(name_c) time_cost_end = time.perf_counter() print( "{} -- Hydraulic simulation finished, cost time: {:.2f} s.".format( @@ -1227,6 +1227,7 @@ def run_simulation( # print(num_periods_result) # print(node_result) # 存储 + starttime = time.time() if simulation_type.upper() == "REALTIME": TimescaleInternalStorage.store_realtime_simulation( node_result, link_result, modify_pattern_start_time @@ -1240,6 +1241,8 @@ def run_simulation( modify_pattern_start_time, num_periods_result, ) + endtime = time.time() + logging.info("store time: %f", endtime - starttime) # 暂不需要再次存储 SCADA 模拟信息 # TimescaleInternalStorage.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name) diff --git a/timescaledb/schemas/realtime.py b/timescaledb/schemas/realtime.py index 83e66f0..2cc00f8 100644 --- a/timescaledb/schemas/realtime.py +++ b/timescaledb/schemas/realtime.py @@ -12,79 +12,75 @@ class RealtimeRepository: @staticmethod async def insert_links_batch(conn: AsyncConnection, data: List[dict]): - """Batch insert for realtime.link_simulation using INSERT for performance.""" + """Batch insert for realtime.link_simulation using DELETE then COPY for performance.""" if not data: return - query = """ - INSERT INTO realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT (time, id) DO UPDATE SET - flow = EXCLUDED.flow, - friction = EXCLUDED.friction, - headloss = EXCLUDED.headloss, - quality = EXCLUDED.quality, - reaction = EXCLUDED.reaction, - setting = EXCLUDED.setting, - status = EXCLUDED.status, - velocity = EXCLUDED.velocity - """ - async with conn.cursor() as cur: - await cur.executemany( - query, - [ - ( - item["time"], - item["id"], - item.get("flow"), - item.get("friction"), - item.get("headloss"), - item.get("quality"), - item.get("reaction"), - item.get("setting"), - item.get("status"), - item.get("velocity"), - ) - for item in data - ], - ) + + # 假设同一批次的数据时间是相同的 + target_time = data[0]["time"] + + # 使用事务确保原子性 + async with conn.transaction(): + async with conn.cursor() as cur: + # 1. 先删除该时间点的旧数据 + await cur.execute( + "DELETE FROM realtime.link_simulation WHERE time = %s", + (target_time,) + ) + + # 2. 使用 COPY 快速写入新数据 + async with cur.copy( + "COPY realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN" + ) as copy: + for item in data: + await copy.write_row(( + item["time"], + item["id"], + item.get("flow"), + item.get("friction"), + item.get("headloss"), + item.get("quality"), + item.get("reaction"), + item.get("setting"), + item.get("status"), + item.get("velocity"), + )) @staticmethod def insert_links_batch_sync(conn: Connection, data: List[dict]): - """Batch insert for realtime.link_simulation using INSERT for performance (sync version).""" + """Batch insert for realtime.link_simulation using DELETE then COPY for performance (sync version).""" if not data: return - query = """ - INSERT INTO realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT (time, id) DO UPDATE SET - flow = EXCLUDED.flow, - friction = EXCLUDED.friction, - headloss = EXCLUDED.headloss, - quality = EXCLUDED.quality, - reaction = EXCLUDED.reaction, - setting = EXCLUDED.setting, - status = EXCLUDED.status, - velocity = EXCLUDED.velocity - """ - with conn.cursor() as cur: - cur.executemany( - query, - [ - ( - item["time"], - item["id"], - item.get("flow"), - item.get("friction"), - item.get("headloss"), - item.get("quality"), - item.get("reaction"), - item.get("setting"), - item.get("status"), - item.get("velocity"), - ) - for item in data - ], - ) + + # 假设同一批次的数据时间是相同的 + target_time = data[0]["time"] + + # 使用事务确保原子性 + with conn.transaction(): + with conn.cursor() as cur: + # 1. 先删除该时间点的旧数据 + cur.execute( + "DELETE FROM realtime.link_simulation WHERE time = %s", + (target_time,) + ) + + # 2. 使用 COPY 快速写入新数据 + with cur.copy( + "COPY realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN" + ) as copy: + for item in data: + copy.write_row(( + item["time"], + item["id"], + item.get("flow"), + item.get("friction"), + item.get("headloss"), + item.get("quality"), + item.get("reaction"), + item.get("setting"), + item.get("status"), + item.get("velocity"), + )) @staticmethod async def get_link_by_time_range( @@ -213,59 +209,63 @@ class RealtimeRepository: async def insert_nodes_batch(conn: AsyncConnection, data: List[dict]): if not data: return - query = """ - INSERT INTO realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality) - VALUES (%s, %s, %s, %s, %s, %s) - ON CONFLICT (time, id) DO UPDATE SET - actual_demand = EXCLUDED.actual_demand, - total_head = EXCLUDED.total_head, - pressure = EXCLUDED.pressure, - quality = EXCLUDED.quality - """ - async with conn.cursor() as cur: - await cur.executemany( - query, - [ - ( - item["time"], - item["id"], - item.get("actual_demand"), - item.get("total_head"), - item.get("pressure"), - item.get("quality"), - ) - for item in data - ], - ) + + # 假设同一批次的数据时间是相同的 + target_time = data[0]["time"] + + # 使用事务确保原子性 + async with conn.transaction(): + async with conn.cursor() as cur: + # 1. 先删除该时间点的旧数据 + await cur.execute( + "DELETE FROM realtime.node_simulation WHERE time = %s", + (target_time,) + ) + + # 2. 使用 COPY 快速写入新数据 + async with cur.copy( + "COPY realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality) FROM STDIN" + ) as copy: + for item in data: + await copy.write_row(( + item["time"], + item["id"], + item.get("actual_demand"), + item.get("total_head"), + item.get("pressure"), + item.get("quality"), + )) @staticmethod def insert_nodes_batch_sync(conn: Connection, data: List[dict]): if not data: return - query = """ - INSERT INTO realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality) - VALUES (%s, %s, %s, %s, %s, %s) - ON CONFLICT (time, id) DO UPDATE SET - actual_demand = EXCLUDED.actual_demand, - total_head = EXCLUDED.total_head, - pressure = EXCLUDED.pressure, - quality = EXCLUDED.quality - """ - with conn.cursor() as cur: - cur.executemany( - query, - [ - ( - item["time"], - item["id"], - item.get("actual_demand"), - item.get("total_head"), - item.get("pressure"), - item.get("quality"), - ) - for item in data - ], - ) + + # 假设同一批次的数据时间是相同的 + target_time = data[0]["time"] + + # 使用事务确保原子性 + with conn.transaction(): + with conn.cursor() as cur: + # 1. 先删除该时间点的旧数据 + cur.execute( + "DELETE FROM realtime.node_simulation WHERE time = %s", + (target_time,) + ) + + # 2. 使用 COPY 快速写入新数据 + with cur.copy( + "COPY realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality) FROM STDIN" + ) as copy: + for item in data: + copy.write_row(( + item["time"], + item["id"], + item.get("actual_demand"), + item.get("total_head"), + item.get("pressure"), + item.get("quality"), + )) @staticmethod async def get_node_by_time_range( diff --git a/timescaledb/schemas/scheme.py b/timescaledb/schemas/scheme.py index 044ab38..e478cfe 100644 --- a/timescaledb/schemas/scheme.py +++ b/timescaledb/schemas/scheme.py @@ -13,83 +13,87 @@ class SchemeRepository: @staticmethod async def insert_links_batch(conn: AsyncConnection, data: List[dict]): - """Batch insert for scheme.link_simulation using INSERT for performance.""" + """Batch insert for scheme.link_simulation using DELETE then COPY for performance.""" if not data: return - query = """ - INSERT INTO scheme.link_simulation (time, scheme_type, scheme_name, id, flow, friction, headloss, quality, reaction, setting, status, velocity) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT (time, scheme_type, scheme_name, id) DO UPDATE SET - flow = EXCLUDED.flow, - friction = EXCLUDED.friction, - headloss = EXCLUDED.headloss, - quality = EXCLUDED.quality, - reaction = EXCLUDED.reaction, - setting = EXCLUDED.setting, - status = EXCLUDED.status, - velocity = EXCLUDED.velocity - """ - async with conn.cursor() as cur: - await cur.executemany( - query, - [ - ( - item["time"], - item["scheme_type"], - item["scheme_name"], - item["id"], - item.get("flow"), - item.get("friction"), - item.get("headloss"), - item.get("quality"), - item.get("reaction"), - item.get("setting"), - item.get("status"), - item.get("velocity"), - ) - for item in data - ], - ) + + # 假设同一批次的数据时间、scheme_type、scheme_name 是相同的 + target_time = data[0]["time"] + target_scheme_type = data[0]["scheme_type"] + target_scheme_name = data[0]["scheme_name"] + + # 使用事务确保原子性 + async with conn.transaction(): + async with conn.cursor() as cur: + # 1. 先删除该时间点、scheme_type、scheme_name 的旧数据 + await cur.execute( + "DELETE FROM scheme.link_simulation WHERE time = %s AND scheme_type = %s AND scheme_name = %s", + (target_time, target_scheme_type, target_scheme_name), + ) + + # 2. 使用 COPY 快速写入新数据 + async with cur.copy( + "COPY scheme.link_simulation (time, scheme_type, scheme_name, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN" + ) as copy: + for item in data: + await copy.write_row( + ( + item["time"], + item["scheme_type"], + item["scheme_name"], + item["id"], + item.get("flow"), + item.get("friction"), + item.get("headloss"), + item.get("quality"), + item.get("reaction"), + item.get("setting"), + item.get("status"), + item.get("velocity"), + ) + ) @staticmethod def insert_links_batch_sync(conn: Connection, data: List[dict]): - """Batch insert for scheme.link_simulation using INSERT for performance (sync version).""" + """Batch insert for scheme.link_simulation using DELETE then COPY for performance (sync version).""" if not data: return - query = """ - INSERT INTO scheme.link_simulation (time, scheme_type, scheme_name, id, flow, friction, headloss, quality, reaction, setting, status, velocity) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT (time, scheme_type, scheme_name, id) DO UPDATE SET - flow = EXCLUDED.flow, - friction = EXCLUDED.friction, - headloss = EXCLUDED.headloss, - quality = EXCLUDED.quality, - reaction = EXCLUDED.reaction, - setting = EXCLUDED.setting, - status = EXCLUDED.status, - velocity = EXCLUDED.velocity - """ - with conn.cursor() as cur: - cur.executemany( - query, - [ - ( - item["time"], - item["scheme_type"], - item["scheme_name"], - item["id"], - item.get("flow"), - item.get("friction"), - item.get("headloss"), - item.get("quality"), - item.get("reaction"), - item.get("setting"), - item.get("status"), - item.get("velocity"), - ) - for item in data - ], - ) + + # 假设同一批次的数据时间、scheme_type、scheme_name 是相同的 + target_time = data[0]["time"] + target_scheme_type = data[0]["scheme_type"] + target_scheme_name = data[0]["scheme_name"] + + # 使用事务确保原子性 + with conn.transaction(): + with conn.cursor() as cur: + # 1. 先删除该时间点、scheme_type、scheme_name 的旧数据 + cur.execute( + "DELETE FROM scheme.link_simulation WHERE time = %s AND scheme_type = %s AND scheme_name = %s", + (target_time, target_scheme_type, target_scheme_name), + ) + + # 2. 使用 COPY 快速写入新数据 + with cur.copy( + "COPY scheme.link_simulation (time, scheme_type, scheme_name, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN" + ) as copy: + for item in data: + copy.write_row( + ( + item["time"], + item["scheme_type"], + item["scheme_name"], + item["id"], + item.get("flow"), + item.get("friction"), + item.get("headloss"), + item.get("quality"), + item.get("reaction"), + item.get("setting"), + item.get("status"), + item.get("velocity"), + ) + ) @staticmethod async def get_link_by_scheme_and_time_range( @@ -239,63 +243,75 @@ class SchemeRepository: async def insert_nodes_batch(conn: AsyncConnection, data: List[dict]): if not data: return - query = """ - INSERT INTO scheme.node_simulation (time, scheme_type, scheme_name, id, actual_demand, total_head, pressure, quality) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT (time, scheme_type, scheme_name, id) DO UPDATE SET - actual_demand = EXCLUDED.actual_demand, - total_head = EXCLUDED.total_head, - pressure = EXCLUDED.pressure, - quality = EXCLUDED.quality - """ - async with conn.cursor() as cur: - await cur.executemany( - query, - [ - ( - item["time"], - item["scheme_type"], - item["scheme_name"], - item["id"], - item.get("actual_demand"), - item.get("total_head"), - item.get("pressure"), - item.get("quality"), - ) - for item in data - ], - ) + + # 假设同一批次的数据时间、scheme_type、scheme_name 是相同的 + target_time = data[0]["time"] + target_scheme_type = data[0]["scheme_type"] + target_scheme_name = data[0]["scheme_name"] + + # 使用事务确保原子性 + async with conn.transaction(): + async with conn.cursor() as cur: + # 1. 先删除该时间点、scheme_type、scheme_name 的旧数据 + await cur.execute( + "DELETE FROM scheme.node_simulation WHERE time = %s AND scheme_type = %s AND scheme_name = %s", + (target_time, target_scheme_type, target_scheme_name), + ) + + # 2. 使用 COPY 快速写入新数据 + async with cur.copy( + "COPY scheme.node_simulation (time, scheme_type, scheme_name, id, actual_demand, total_head, pressure, quality) FROM STDIN" + ) as copy: + for item in data: + await copy.write_row( + ( + item["time"], + item["scheme_type"], + item["scheme_name"], + item["id"], + item.get("actual_demand"), + item.get("total_head"), + item.get("pressure"), + item.get("quality"), + ) + ) @staticmethod def insert_nodes_batch_sync(conn: Connection, data: List[dict]): if not data: return - query = """ - INSERT INTO scheme.node_simulation (time, scheme_type, scheme_name, id, actual_demand, total_head, pressure, quality) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT (time, scheme_type, scheme_name, id) DO UPDATE SET - actual_demand = EXCLUDED.actual_demand, - total_head = EXCLUDED.total_head, - pressure = EXCLUDED.pressure, - quality = EXCLUDED.quality - """ - with conn.cursor() as cur: - cur.executemany( - query, - [ - ( - item["time"], - item["scheme_type"], - item["scheme_name"], - item["id"], - item.get("actual_demand"), - item.get("total_head"), - item.get("pressure"), - item.get("quality"), - ) - for item in data - ], - ) + + # 假设同一批次的数据时间、scheme_type、scheme_name 是相同的 + target_time = data[0]["time"] + target_scheme_type = data[0]["scheme_type"] + target_scheme_name = data[0]["scheme_name"] + + # 使用事务确保原子性 + with conn.transaction(): + with conn.cursor() as cur: + # 1. 先删除该时间点、scheme_type、scheme_name 的旧数据 + cur.execute( + "DELETE FROM scheme.node_simulation WHERE time = %s AND scheme_type = %s AND scheme_name = %s", + (target_time, target_scheme_type, target_scheme_name), + ) + + # 2. 使用 COPY 快速写入新数据 + with cur.copy( + "COPY scheme.node_simulation (time, scheme_type, scheme_name, id, actual_demand, total_head, pressure, quality) FROM STDIN" + ) as copy: + for item in data: + copy.write_row( + ( + item["time"], + item["scheme_type"], + item["scheme_name"], + item["id"], + item.get("actual_demand"), + item.get("total_head"), + item.get("pressure"), + item.get("quality"), + ) + ) @staticmethod async def get_node_by_scheme_and_time_range(