还原旧依赖文件;使用事务 Delete And Copy 加速数据的存储/覆盖

This commit is contained in:
JIANG
2025-12-10 11:06:09 +08:00
parent 77cc7236fc
commit 8a9345dfcc
5 changed files with 260 additions and 250 deletions

View File

@@ -12,79 +12,75 @@ class RealtimeRepository:
@staticmethod
async def insert_links_batch(conn: AsyncConnection, data: List[dict]):
"""Batch insert for realtime.link_simulation using INSERT for performance."""
"""Batch insert for realtime.link_simulation using DELETE then COPY for performance."""
if not data:
return
query = """
INSERT INTO realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (time, id) DO UPDATE SET
flow = EXCLUDED.flow,
friction = EXCLUDED.friction,
headloss = EXCLUDED.headloss,
quality = EXCLUDED.quality,
reaction = EXCLUDED.reaction,
setting = EXCLUDED.setting,
status = EXCLUDED.status,
velocity = EXCLUDED.velocity
"""
async with conn.cursor() as cur:
await cur.executemany(
query,
[
(
item["time"],
item["id"],
item.get("flow"),
item.get("friction"),
item.get("headloss"),
item.get("quality"),
item.get("reaction"),
item.get("setting"),
item.get("status"),
item.get("velocity"),
)
for item in data
],
)
# 假设同一批次的数据时间是相同的
target_time = data[0]["time"]
# 使用事务确保原子性
async with conn.transaction():
async with conn.cursor() as cur:
# 1. 先删除该时间点的旧数据
await cur.execute(
"DELETE FROM realtime.link_simulation WHERE time = %s",
(target_time,)
)
# 2. 使用 COPY 快速写入新数据
async with cur.copy(
"COPY realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN"
) as copy:
for item in data:
await copy.write_row((
item["time"],
item["id"],
item.get("flow"),
item.get("friction"),
item.get("headloss"),
item.get("quality"),
item.get("reaction"),
item.get("setting"),
item.get("status"),
item.get("velocity"),
))
@staticmethod
def insert_links_batch_sync(conn: Connection, data: List[dict]):
"""Batch insert for realtime.link_simulation using INSERT for performance (sync version)."""
"""Batch insert for realtime.link_simulation using DELETE then COPY for performance (sync version)."""
if not data:
return
query = """
INSERT INTO realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (time, id) DO UPDATE SET
flow = EXCLUDED.flow,
friction = EXCLUDED.friction,
headloss = EXCLUDED.headloss,
quality = EXCLUDED.quality,
reaction = EXCLUDED.reaction,
setting = EXCLUDED.setting,
status = EXCLUDED.status,
velocity = EXCLUDED.velocity
"""
with conn.cursor() as cur:
cur.executemany(
query,
[
(
item["time"],
item["id"],
item.get("flow"),
item.get("friction"),
item.get("headloss"),
item.get("quality"),
item.get("reaction"),
item.get("setting"),
item.get("status"),
item.get("velocity"),
)
for item in data
],
)
# 假设同一批次的数据时间是相同的
target_time = data[0]["time"]
# 使用事务确保原子性
with conn.transaction():
with conn.cursor() as cur:
# 1. 先删除该时间点的旧数据
cur.execute(
"DELETE FROM realtime.link_simulation WHERE time = %s",
(target_time,)
)
# 2. 使用 COPY 快速写入新数据
with cur.copy(
"COPY realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN"
) as copy:
for item in data:
copy.write_row((
item["time"],
item["id"],
item.get("flow"),
item.get("friction"),
item.get("headloss"),
item.get("quality"),
item.get("reaction"),
item.get("setting"),
item.get("status"),
item.get("velocity"),
))
@staticmethod
async def get_link_by_time_range(
@@ -213,59 +209,63 @@ class RealtimeRepository:
async def insert_nodes_batch(conn: AsyncConnection, data: List[dict]):
if not data:
return
query = """
INSERT INTO realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (time, id) DO UPDATE SET
actual_demand = EXCLUDED.actual_demand,
total_head = EXCLUDED.total_head,
pressure = EXCLUDED.pressure,
quality = EXCLUDED.quality
"""
async with conn.cursor() as cur:
await cur.executemany(
query,
[
(
item["time"],
item["id"],
item.get("actual_demand"),
item.get("total_head"),
item.get("pressure"),
item.get("quality"),
)
for item in data
],
)
# 假设同一批次的数据时间是相同的
target_time = data[0]["time"]
# 使用事务确保原子性
async with conn.transaction():
async with conn.cursor() as cur:
# 1. 先删除该时间点的旧数据
await cur.execute(
"DELETE FROM realtime.node_simulation WHERE time = %s",
(target_time,)
)
# 2. 使用 COPY 快速写入新数据
async with cur.copy(
"COPY realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality) FROM STDIN"
) as copy:
for item in data:
await copy.write_row((
item["time"],
item["id"],
item.get("actual_demand"),
item.get("total_head"),
item.get("pressure"),
item.get("quality"),
))
@staticmethod
def insert_nodes_batch_sync(conn: Connection, data: List[dict]):
if not data:
return
query = """
INSERT INTO realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (time, id) DO UPDATE SET
actual_demand = EXCLUDED.actual_demand,
total_head = EXCLUDED.total_head,
pressure = EXCLUDED.pressure,
quality = EXCLUDED.quality
"""
with conn.cursor() as cur:
cur.executemany(
query,
[
(
item["time"],
item["id"],
item.get("actual_demand"),
item.get("total_head"),
item.get("pressure"),
item.get("quality"),
)
for item in data
],
)
# 假设同一批次的数据时间是相同的
target_time = data[0]["time"]
# 使用事务确保原子性
with conn.transaction():
with conn.cursor() as cur:
# 1. 先删除该时间点的旧数据
cur.execute(
"DELETE FROM realtime.node_simulation WHERE time = %s",
(target_time,)
)
# 2. 使用 COPY 快速写入新数据
with cur.copy(
"COPY realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality) FROM STDIN"
) as copy:
for item in data:
copy.write_row((
item["time"],
item["id"],
item.get("actual_demand"),
item.get("total_head"),
item.get("pressure"),
item.get("quality"),
))
@staticmethod
async def get_node_by_time_range(