From f5547d80c9ea897f253820e5d0a2fcf67aa6213f Mon Sep 17 00:00:00 2001 From: JIANG Date: Mon, 8 Dec 2025 17:41:54 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=95=B0=E6=8D=AE=E5=AF=BC?= =?UTF-8?q?=E5=85=A5=E6=96=B9=E6=B3=95=EF=BC=8Ccopy=E8=AF=AD=E6=B3=95?= =?UTF-8?q?=E6=94=B9=E4=B8=BAinsert=E8=AF=AD=E6=B3=95=E4=BB=A5=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=86=B2=E7=AA=81=E6=97=B6=E6=95=B0=E6=8D=AE=E8=A6=86?= =?UTF-8?q?=E7=9B=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- timescaledb/internal_queries.py | 3 - timescaledb/schemas/realtime.py | 132 ++++++++++++++---------------- timescaledb/schemas/scheme.py | 140 ++++++++++++++------------------ 3 files changed, 124 insertions(+), 151 deletions(-) diff --git a/timescaledb/internal_queries.py b/timescaledb/internal_queries.py index dab72c0..5d17d6d 100644 --- a/timescaledb/internal_queries.py +++ b/timescaledb/internal_queries.py @@ -28,12 +28,9 @@ class InternalStorage: else timescaledb_info.get_pgconn_string() ) with psycopg.Connection.connect(conn_string) as conn: - starttime = time.time() RealtimeRepository.store_realtime_simulation_result_sync( conn, node_result_list, link_result_list, result_start_time ) - endtime = time.time() - logger.info(f"存储实时模拟结果耗时: {endtime - starttime} 秒") break # 成功 except Exception as e: logger.error(f"存储尝试 {attempt + 1} 失败: {e}") diff --git a/timescaledb/schemas/realtime.py b/timescaledb/schemas/realtime.py index 07215d6..202d6f8 100644 --- a/timescaledb/schemas/realtime.py +++ b/timescaledb/schemas/realtime.py @@ -12,55 +12,51 @@ class RealtimeRepository: @staticmethod async def insert_links_batch(conn: AsyncConnection, data: List[dict]): - """Batch insert for realtime.link_simulation using COPY for performance.""" + """Batch insert for realtime.link_simulation using INSERT for performance.""" if not data: return - + query = """ + INSERT INTO realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (time, id) DO UPDATE SET + flow = EXCLUDED.flow, + friction = EXCLUDED.friction, + headloss = EXCLUDED.headloss, + quality = EXCLUDED.quality, + reaction = EXCLUDED.reaction, + setting = EXCLUDED.setting, + status = EXCLUDED.status, + velocity = EXCLUDED.velocity + """ async with conn.cursor() as cur: - async with cur.copy( - "COPY realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN" - ) as copy: - for item in data: - await copy.write_row( - ( - item["time"], - item["id"], - item.get("flow"), - item.get("friction"), - item.get("headloss"), - item.get("quality"), - item.get("reaction"), - item.get("setting"), - item.get("status"), - item.get("velocity"), - ) - ) + await cur.executemany(query, [ + (item["time"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity")) + for item in data + ]) @staticmethod def insert_links_batch_sync(conn: Connection, data: List[dict]): - """Batch insert for realtime.link_simulation using COPY for performance (sync version).""" + """Batch insert for realtime.link_simulation using INSERT for performance (sync version).""" if not data: return - + query = """ + INSERT INTO realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (time, id) DO UPDATE SET + flow = EXCLUDED.flow, + friction = EXCLUDED.friction, + headloss = EXCLUDED.headloss, + quality = EXCLUDED.quality, + reaction = EXCLUDED.reaction, + setting = EXCLUDED.setting, + status = EXCLUDED.status, + velocity = EXCLUDED.velocity + """ with conn.cursor() as cur: - with cur.copy( - "COPY realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN" - ) as copy: - for item in data: - copy.write_row( - ( - item["time"], - item["id"], - item.get("flow"), - item.get("friction"), - item.get("headloss"), - item.get("quality"), - item.get("reaction"), - item.get("setting"), - item.get("status"), - item.get("velocity"), - ) - ) + cur.executemany(query, [ + (item["time"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity")) + for item in data + ]) @staticmethod async def get_link_by_time_range( @@ -189,43 +185,39 @@ class RealtimeRepository: async def insert_nodes_batch(conn: AsyncConnection, data: List[dict]): if not data: return - + query = """ + INSERT INTO realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality) + VALUES (%s, %s, %s, %s, %s, %s) + ON CONFLICT (time, id) DO UPDATE SET + actual_demand = EXCLUDED.actual_demand, + total_head = EXCLUDED.total_head, + pressure = EXCLUDED.pressure, + quality = EXCLUDED.quality + """ async with conn.cursor() as cur: - async with cur.copy( - "COPY realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality) FROM STDIN" - ) as copy: - for item in data: - await copy.write_row( - ( - item["time"], - item["id"], - item.get("actual_demand"), - item.get("total_head"), - item.get("pressure"), - item.get("quality"), - ) - ) + await cur.executemany(query, [ + (item["time"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality")) + for item in data + ]) @staticmethod def insert_nodes_batch_sync(conn: Connection, data: List[dict]): if not data: return - + query = """ + INSERT INTO realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality) + VALUES (%s, %s, %s, %s, %s, %s) + ON CONFLICT (time, id) DO UPDATE SET + actual_demand = EXCLUDED.actual_demand, + total_head = EXCLUDED.total_head, + pressure = EXCLUDED.pressure, + quality = EXCLUDED.quality + """ with conn.cursor() as cur: - with cur.copy( - "COPY realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality) FROM STDIN" - ) as copy: - for item in data: - copy.write_row( - ( - item["time"], - item["id"], - item.get("actual_demand"), - item.get("total_head"), - item.get("pressure"), - item.get("quality"), - ) - ) + cur.executemany(query, [ + (item["time"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality")) + for item in data + ]) @staticmethod async def get_node_by_time_range( diff --git a/timescaledb/schemas/scheme.py b/timescaledb/schemas/scheme.py index 030a2ed..44060f5 100644 --- a/timescaledb/schemas/scheme.py +++ b/timescaledb/schemas/scheme.py @@ -13,59 +13,51 @@ class SchemeRepository: @staticmethod async def insert_links_batch(conn: AsyncConnection, data: List[dict]): - """Batch insert for scheme.link_simulation using COPY for performance.""" + """Batch insert for scheme.link_simulation using INSERT for performance.""" if not data: return - + query = """ + INSERT INTO scheme.link_simulation (time, scheme_type, scheme_name, id, flow, friction, headloss, quality, reaction, setting, status, velocity) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (time, scheme_type, scheme_name, id) DO UPDATE SET + flow = EXCLUDED.flow, + friction = EXCLUDED.friction, + headloss = EXCLUDED.headloss, + quality = EXCLUDED.quality, + reaction = EXCLUDED.reaction, + setting = EXCLUDED.setting, + status = EXCLUDED.status, + velocity = EXCLUDED.velocity + """ async with conn.cursor() as cur: - async with cur.copy( - "COPY scheme.link_simulation (time, scheme_type, scheme_name, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN" - ) as copy: - for item in data: - await copy.write_row( - ( - item["time"], - item["scheme_type"], - item["scheme_name"], - item["id"], - item.get("flow"), - item.get("friction"), - item.get("headloss"), - item.get("quality"), - item.get("reaction"), - item.get("setting"), - item.get("status"), - item.get("velocity"), - ) - ) + await cur.executemany(query, [ + (item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity")) + for item in data + ]) @staticmethod def insert_links_batch_sync(conn: Connection, data: List[dict]): - """Batch insert for scheme.link_simulation using COPY for performance (sync version).""" + """Batch insert for scheme.link_simulation using INSERT for performance (sync version).""" if not data: return - + query = """ + INSERT INTO scheme.link_simulation (time, scheme_type, scheme_name, id, flow, friction, headloss, quality, reaction, setting, status, velocity) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (time, scheme_type, scheme_name, id) DO UPDATE SET + flow = EXCLUDED.flow, + friction = EXCLUDED.friction, + headloss = EXCLUDED.headloss, + quality = EXCLUDED.quality, + reaction = EXCLUDED.reaction, + setting = EXCLUDED.setting, + status = EXCLUDED.status, + velocity = EXCLUDED.velocity + """ with conn.cursor() as cur: - with cur.copy( - "COPY scheme.link_simulation (time, scheme_type, scheme_name, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN" - ) as copy: - for item in data: - copy.write_row( - ( - item["time"], - item["scheme_type"], - item["scheme_name"], - item["id"], - item.get("flow"), - item.get("friction"), - item.get("headloss"), - item.get("quality"), - item.get("reaction"), - item.get("setting"), - item.get("status"), - item.get("velocity"), - ) - ) + cur.executemany(query, [ + (item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity")) + for item in data + ]) @staticmethod async def get_link_by_scheme_and_time_range( @@ -215,47 +207,39 @@ class SchemeRepository: async def insert_nodes_batch(conn: AsyncConnection, data: List[dict]): if not data: return - + query = """ + INSERT INTO scheme.node_simulation (time, scheme_type, scheme_name, id, actual_demand, total_head, pressure, quality) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (time, scheme_type, scheme_name, id) DO UPDATE SET + actual_demand = EXCLUDED.actual_demand, + total_head = EXCLUDED.total_head, + pressure = EXCLUDED.pressure, + quality = EXCLUDED.quality + """ async with conn.cursor() as cur: - async with cur.copy( - "COPY scheme.node_simulation (time, scheme_type, scheme_name, id, actual_demand, total_head, pressure, quality) FROM STDIN" - ) as copy: - for item in data: - await copy.write_row( - ( - item["time"], - item["scheme_type"], - item["scheme_name"], - item["id"], - item.get("actual_demand"), - item.get("total_head"), - item.get("pressure"), - item.get("quality"), - ) - ) + await cur.executemany(query, [ + (item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality")) + for item in data + ]) @staticmethod def insert_nodes_batch_sync(conn: Connection, data: List[dict]): if not data: return - + query = """ + INSERT INTO scheme.node_simulation (time, scheme_type, scheme_name, id, actual_demand, total_head, pressure, quality) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (time, scheme_type, scheme_name, id) DO UPDATE SET + actual_demand = EXCLUDED.actual_demand, + total_head = EXCLUDED.total_head, + pressure = EXCLUDED.pressure, + quality = EXCLUDED.quality + """ with conn.cursor() as cur: - with cur.copy( - "COPY scheme.node_simulation (time, scheme_type, scheme_name, id, actual_demand, total_head, pressure, quality) FROM STDIN" - ) as copy: - for item in data: - copy.write_row( - ( - item["time"], - item["scheme_type"], - item["scheme_name"], - item["id"], - item.get("actual_demand"), - item.get("total_head"), - item.get("pressure"), - item.get("quality"), - ) - ) + cur.executemany(query, [ + (item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality")) + for item in data + ]) @staticmethod async def get_node_by_scheme_and_time_range(