更新数据导入方法,copy语法改为insert语法以支持冲突时数据覆盖
This commit is contained in:
@@ -28,12 +28,9 @@ class InternalStorage:
|
|||||||
else timescaledb_info.get_pgconn_string()
|
else timescaledb_info.get_pgconn_string()
|
||||||
)
|
)
|
||||||
with psycopg.Connection.connect(conn_string) as conn:
|
with psycopg.Connection.connect(conn_string) as conn:
|
||||||
starttime = time.time()
|
|
||||||
RealtimeRepository.store_realtime_simulation_result_sync(
|
RealtimeRepository.store_realtime_simulation_result_sync(
|
||||||
conn, node_result_list, link_result_list, result_start_time
|
conn, node_result_list, link_result_list, result_start_time
|
||||||
)
|
)
|
||||||
endtime = time.time()
|
|
||||||
logger.info(f"存储实时模拟结果耗时: {endtime - starttime} 秒")
|
|
||||||
break # 成功
|
break # 成功
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"存储尝试 {attempt + 1} 失败: {e}")
|
logger.error(f"存储尝试 {attempt + 1} 失败: {e}")
|
||||||
|
|||||||
@@ -12,55 +12,51 @@ class RealtimeRepository:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def insert_links_batch(conn: AsyncConnection, data: List[dict]):
|
async def insert_links_batch(conn: AsyncConnection, data: List[dict]):
|
||||||
"""Batch insert for realtime.link_simulation using COPY for performance."""
|
"""Batch insert for realtime.link_simulation using INSERT for performance."""
|
||||||
if not data:
|
if not data:
|
||||||
return
|
return
|
||||||
|
query = """
|
||||||
|
INSERT INTO realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
|
ON CONFLICT (time, id) DO UPDATE SET
|
||||||
|
flow = EXCLUDED.flow,
|
||||||
|
friction = EXCLUDED.friction,
|
||||||
|
headloss = EXCLUDED.headloss,
|
||||||
|
quality = EXCLUDED.quality,
|
||||||
|
reaction = EXCLUDED.reaction,
|
||||||
|
setting = EXCLUDED.setting,
|
||||||
|
status = EXCLUDED.status,
|
||||||
|
velocity = EXCLUDED.velocity
|
||||||
|
"""
|
||||||
async with conn.cursor() as cur:
|
async with conn.cursor() as cur:
|
||||||
async with cur.copy(
|
await cur.executemany(query, [
|
||||||
"COPY realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN"
|
(item["time"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity"))
|
||||||
) as copy:
|
for item in data
|
||||||
for item in data:
|
])
|
||||||
await copy.write_row(
|
|
||||||
(
|
|
||||||
item["time"],
|
|
||||||
item["id"],
|
|
||||||
item.get("flow"),
|
|
||||||
item.get("friction"),
|
|
||||||
item.get("headloss"),
|
|
||||||
item.get("quality"),
|
|
||||||
item.get("reaction"),
|
|
||||||
item.get("setting"),
|
|
||||||
item.get("status"),
|
|
||||||
item.get("velocity"),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def insert_links_batch_sync(conn: Connection, data: List[dict]):
|
def insert_links_batch_sync(conn: Connection, data: List[dict]):
|
||||||
"""Batch insert for realtime.link_simulation using COPY for performance (sync version)."""
|
"""Batch insert for realtime.link_simulation using INSERT for performance (sync version)."""
|
||||||
if not data:
|
if not data:
|
||||||
return
|
return
|
||||||
|
query = """
|
||||||
|
INSERT INTO realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
|
ON CONFLICT (time, id) DO UPDATE SET
|
||||||
|
flow = EXCLUDED.flow,
|
||||||
|
friction = EXCLUDED.friction,
|
||||||
|
headloss = EXCLUDED.headloss,
|
||||||
|
quality = EXCLUDED.quality,
|
||||||
|
reaction = EXCLUDED.reaction,
|
||||||
|
setting = EXCLUDED.setting,
|
||||||
|
status = EXCLUDED.status,
|
||||||
|
velocity = EXCLUDED.velocity
|
||||||
|
"""
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
with cur.copy(
|
cur.executemany(query, [
|
||||||
"COPY realtime.link_simulation (time, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN"
|
(item["time"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity"))
|
||||||
) as copy:
|
for item in data
|
||||||
for item in data:
|
])
|
||||||
copy.write_row(
|
|
||||||
(
|
|
||||||
item["time"],
|
|
||||||
item["id"],
|
|
||||||
item.get("flow"),
|
|
||||||
item.get("friction"),
|
|
||||||
item.get("headloss"),
|
|
||||||
item.get("quality"),
|
|
||||||
item.get("reaction"),
|
|
||||||
item.get("setting"),
|
|
||||||
item.get("status"),
|
|
||||||
item.get("velocity"),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def get_link_by_time_range(
|
async def get_link_by_time_range(
|
||||||
@@ -189,43 +185,39 @@ class RealtimeRepository:
|
|||||||
async def insert_nodes_batch(conn: AsyncConnection, data: List[dict]):
|
async def insert_nodes_batch(conn: AsyncConnection, data: List[dict]):
|
||||||
if not data:
|
if not data:
|
||||||
return
|
return
|
||||||
|
query = """
|
||||||
|
INSERT INTO realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s)
|
||||||
|
ON CONFLICT (time, id) DO UPDATE SET
|
||||||
|
actual_demand = EXCLUDED.actual_demand,
|
||||||
|
total_head = EXCLUDED.total_head,
|
||||||
|
pressure = EXCLUDED.pressure,
|
||||||
|
quality = EXCLUDED.quality
|
||||||
|
"""
|
||||||
async with conn.cursor() as cur:
|
async with conn.cursor() as cur:
|
||||||
async with cur.copy(
|
await cur.executemany(query, [
|
||||||
"COPY realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality) FROM STDIN"
|
(item["time"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality"))
|
||||||
) as copy:
|
for item in data
|
||||||
for item in data:
|
])
|
||||||
await copy.write_row(
|
|
||||||
(
|
|
||||||
item["time"],
|
|
||||||
item["id"],
|
|
||||||
item.get("actual_demand"),
|
|
||||||
item.get("total_head"),
|
|
||||||
item.get("pressure"),
|
|
||||||
item.get("quality"),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def insert_nodes_batch_sync(conn: Connection, data: List[dict]):
|
def insert_nodes_batch_sync(conn: Connection, data: List[dict]):
|
||||||
if not data:
|
if not data:
|
||||||
return
|
return
|
||||||
|
query = """
|
||||||
|
INSERT INTO realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s)
|
||||||
|
ON CONFLICT (time, id) DO UPDATE SET
|
||||||
|
actual_demand = EXCLUDED.actual_demand,
|
||||||
|
total_head = EXCLUDED.total_head,
|
||||||
|
pressure = EXCLUDED.pressure,
|
||||||
|
quality = EXCLUDED.quality
|
||||||
|
"""
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
with cur.copy(
|
cur.executemany(query, [
|
||||||
"COPY realtime.node_simulation (time, id, actual_demand, total_head, pressure, quality) FROM STDIN"
|
(item["time"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality"))
|
||||||
) as copy:
|
for item in data
|
||||||
for item in data:
|
])
|
||||||
copy.write_row(
|
|
||||||
(
|
|
||||||
item["time"],
|
|
||||||
item["id"],
|
|
||||||
item.get("actual_demand"),
|
|
||||||
item.get("total_head"),
|
|
||||||
item.get("pressure"),
|
|
||||||
item.get("quality"),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def get_node_by_time_range(
|
async def get_node_by_time_range(
|
||||||
|
|||||||
@@ -13,59 +13,51 @@ class SchemeRepository:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def insert_links_batch(conn: AsyncConnection, data: List[dict]):
|
async def insert_links_batch(conn: AsyncConnection, data: List[dict]):
|
||||||
"""Batch insert for scheme.link_simulation using COPY for performance."""
|
"""Batch insert for scheme.link_simulation using INSERT for performance."""
|
||||||
if not data:
|
if not data:
|
||||||
return
|
return
|
||||||
|
query = """
|
||||||
|
INSERT INTO scheme.link_simulation (time, scheme_type, scheme_name, id, flow, friction, headloss, quality, reaction, setting, status, velocity)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
|
ON CONFLICT (time, scheme_type, scheme_name, id) DO UPDATE SET
|
||||||
|
flow = EXCLUDED.flow,
|
||||||
|
friction = EXCLUDED.friction,
|
||||||
|
headloss = EXCLUDED.headloss,
|
||||||
|
quality = EXCLUDED.quality,
|
||||||
|
reaction = EXCLUDED.reaction,
|
||||||
|
setting = EXCLUDED.setting,
|
||||||
|
status = EXCLUDED.status,
|
||||||
|
velocity = EXCLUDED.velocity
|
||||||
|
"""
|
||||||
async with conn.cursor() as cur:
|
async with conn.cursor() as cur:
|
||||||
async with cur.copy(
|
await cur.executemany(query, [
|
||||||
"COPY scheme.link_simulation (time, scheme_type, scheme_name, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN"
|
(item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity"))
|
||||||
) as copy:
|
for item in data
|
||||||
for item in data:
|
])
|
||||||
await copy.write_row(
|
|
||||||
(
|
|
||||||
item["time"],
|
|
||||||
item["scheme_type"],
|
|
||||||
item["scheme_name"],
|
|
||||||
item["id"],
|
|
||||||
item.get("flow"),
|
|
||||||
item.get("friction"),
|
|
||||||
item.get("headloss"),
|
|
||||||
item.get("quality"),
|
|
||||||
item.get("reaction"),
|
|
||||||
item.get("setting"),
|
|
||||||
item.get("status"),
|
|
||||||
item.get("velocity"),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def insert_links_batch_sync(conn: Connection, data: List[dict]):
|
def insert_links_batch_sync(conn: Connection, data: List[dict]):
|
||||||
"""Batch insert for scheme.link_simulation using COPY for performance (sync version)."""
|
"""Batch insert for scheme.link_simulation using INSERT for performance (sync version)."""
|
||||||
if not data:
|
if not data:
|
||||||
return
|
return
|
||||||
|
query = """
|
||||||
|
INSERT INTO scheme.link_simulation (time, scheme_type, scheme_name, id, flow, friction, headloss, quality, reaction, setting, status, velocity)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
|
ON CONFLICT (time, scheme_type, scheme_name, id) DO UPDATE SET
|
||||||
|
flow = EXCLUDED.flow,
|
||||||
|
friction = EXCLUDED.friction,
|
||||||
|
headloss = EXCLUDED.headloss,
|
||||||
|
quality = EXCLUDED.quality,
|
||||||
|
reaction = EXCLUDED.reaction,
|
||||||
|
setting = EXCLUDED.setting,
|
||||||
|
status = EXCLUDED.status,
|
||||||
|
velocity = EXCLUDED.velocity
|
||||||
|
"""
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
with cur.copy(
|
cur.executemany(query, [
|
||||||
"COPY scheme.link_simulation (time, scheme_type, scheme_name, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN"
|
(item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity"))
|
||||||
) as copy:
|
for item in data
|
||||||
for item in data:
|
])
|
||||||
copy.write_row(
|
|
||||||
(
|
|
||||||
item["time"],
|
|
||||||
item["scheme_type"],
|
|
||||||
item["scheme_name"],
|
|
||||||
item["id"],
|
|
||||||
item.get("flow"),
|
|
||||||
item.get("friction"),
|
|
||||||
item.get("headloss"),
|
|
||||||
item.get("quality"),
|
|
||||||
item.get("reaction"),
|
|
||||||
item.get("setting"),
|
|
||||||
item.get("status"),
|
|
||||||
item.get("velocity"),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def get_link_by_scheme_and_time_range(
|
async def get_link_by_scheme_and_time_range(
|
||||||
@@ -215,47 +207,39 @@ class SchemeRepository:
|
|||||||
async def insert_nodes_batch(conn: AsyncConnection, data: List[dict]):
|
async def insert_nodes_batch(conn: AsyncConnection, data: List[dict]):
|
||||||
if not data:
|
if not data:
|
||||||
return
|
return
|
||||||
|
query = """
|
||||||
|
INSERT INTO scheme.node_simulation (time, scheme_type, scheme_name, id, actual_demand, total_head, pressure, quality)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
|
ON CONFLICT (time, scheme_type, scheme_name, id) DO UPDATE SET
|
||||||
|
actual_demand = EXCLUDED.actual_demand,
|
||||||
|
total_head = EXCLUDED.total_head,
|
||||||
|
pressure = EXCLUDED.pressure,
|
||||||
|
quality = EXCLUDED.quality
|
||||||
|
"""
|
||||||
async with conn.cursor() as cur:
|
async with conn.cursor() as cur:
|
||||||
async with cur.copy(
|
await cur.executemany(query, [
|
||||||
"COPY scheme.node_simulation (time, scheme_type, scheme_name, id, actual_demand, total_head, pressure, quality) FROM STDIN"
|
(item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality"))
|
||||||
) as copy:
|
for item in data
|
||||||
for item in data:
|
])
|
||||||
await copy.write_row(
|
|
||||||
(
|
|
||||||
item["time"],
|
|
||||||
item["scheme_type"],
|
|
||||||
item["scheme_name"],
|
|
||||||
item["id"],
|
|
||||||
item.get("actual_demand"),
|
|
||||||
item.get("total_head"),
|
|
||||||
item.get("pressure"),
|
|
||||||
item.get("quality"),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def insert_nodes_batch_sync(conn: Connection, data: List[dict]):
|
def insert_nodes_batch_sync(conn: Connection, data: List[dict]):
|
||||||
if not data:
|
if not data:
|
||||||
return
|
return
|
||||||
|
query = """
|
||||||
|
INSERT INTO scheme.node_simulation (time, scheme_type, scheme_name, id, actual_demand, total_head, pressure, quality)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
|
ON CONFLICT (time, scheme_type, scheme_name, id) DO UPDATE SET
|
||||||
|
actual_demand = EXCLUDED.actual_demand,
|
||||||
|
total_head = EXCLUDED.total_head,
|
||||||
|
pressure = EXCLUDED.pressure,
|
||||||
|
quality = EXCLUDED.quality
|
||||||
|
"""
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
with cur.copy(
|
cur.executemany(query, [
|
||||||
"COPY scheme.node_simulation (time, scheme_type, scheme_name, id, actual_demand, total_head, pressure, quality) FROM STDIN"
|
(item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality"))
|
||||||
) as copy:
|
for item in data
|
||||||
for item in data:
|
])
|
||||||
copy.write_row(
|
|
||||||
(
|
|
||||||
item["time"],
|
|
||||||
item["scheme_type"],
|
|
||||||
item["scheme_name"],
|
|
||||||
item["id"],
|
|
||||||
item.get("actual_demand"),
|
|
||||||
item.get("total_head"),
|
|
||||||
item.get("pressure"),
|
|
||||||
item.get("quality"),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def get_node_by_scheme_and_time_range(
|
async def get_node_by_scheme_and_time_range(
|
||||||
|
|||||||
Reference in New Issue
Block a user