完成在线数据属性获取方法

This commit is contained in:
JIANG
2025-12-09 17:37:48 +08:00
parent 18fc564efc
commit 77cc7236fc
3 changed files with 100 additions and 42 deletions

View File

@@ -132,25 +132,27 @@ async def query_realtime_records_by_time_property(
):
"""Query all realtime records by time and property"""
try:
return await RealtimeRepository.query_all_record_by_time_property(
results = await RealtimeRepository.query_all_record_by_time_property(
conn, query_time, type, property
)
return {"results": results}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/realtime/query/by-id-time")
async def query_realtime_simulation_by_id_time(
ID: str,
id: str,
type: str,
query_time: str,
conn: AsyncConnection = Depends(get_database_connection),
):
"""Query realtime simulation results by ID and time"""
"""Query realtime simulation results by id and time"""
try:
return await RealtimeRepository.query_simulation_result_by_ID_time(
conn, ID, type, query_time
results = await RealtimeRepository.query_simulation_result_by_id_time(
conn, id, type, query_time
)
return {"results": results}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@@ -321,9 +323,10 @@ async def query_scheme_records_by_scheme_time_property(
):
"""Query all scheme records by scheme, time and property"""
try:
return await SchemeRepository.query_all_record_by_scheme_time_property(
results = await SchemeRepository.query_all_record_by_scheme_time_property(
conn, scheme_type, scheme_name, query_time, type, property
)
return {"results": results}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@@ -332,16 +335,17 @@ async def query_scheme_records_by_scheme_time_property(
async def query_scheme_simulation_by_id_time(
scheme_type: str,
scheme_name: str,
ID: str,
id: str,
type: str,
query_time: str,
conn: AsyncConnection = Depends(get_database_connection),
):
"""Query scheme simulation results by ID and time"""
"""Query scheme simulation results by id and time"""
try:
return await SchemeRepository.query_scheme_simulation_result_by_ID_time(
conn, scheme_type, scheme_name, ID, type, query_time
result = await SchemeRepository.query_scheme_simulation_result_by_id_time(
conn, scheme_type, scheme_name, id, type, query_time
)
return {"result": result}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))

View File

@@ -566,22 +566,21 @@ class RealtimeRepository:
raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'")
# Format the results
results = [{"ID": item["id"], "value": item["value"]} for item in data]
return {"results": results}
return [{"ID": item["id"], "value": item["value"]} for item in data]
@staticmethod
async def query_simulation_result_by_ID_time(
async def query_simulation_result_by_id_time(
conn: AsyncConnection,
ID: str,
id: str,
type: str,
query_time: str,
) -> list[dict]:
"""
Query simulation results by ID and time from TimescaleDB.
Query simulation results by id and time from TimescaleDB.
Args:
conn: Database connection
ID: The ID of the node or link
id: The id of the node or link
type: Type of data ("node" or "link")
query_time: Time to query (ISO format string)
@@ -611,11 +610,11 @@ class RealtimeRepository:
# Query based on type
if type.lower() == "node":
return await RealtimeRepository.get_node_by_time_range(
conn, start_time, end_time, ID
conn, start_time, end_time, id
)
elif type.lower() == "link":
return await RealtimeRepository.get_link_by_time_range(
conn, start_time, end_time, ID
conn, start_time, end_time, id
)
else:
raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'")

View File

@@ -30,10 +30,26 @@ class SchemeRepository:
velocity = EXCLUDED.velocity
"""
async with conn.cursor() as cur:
await cur.executemany(query, [
(item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity"))
for item in data
])
await cur.executemany(
query,
[
(
item["time"],
item["scheme_type"],
item["scheme_name"],
item["id"],
item.get("flow"),
item.get("friction"),
item.get("headloss"),
item.get("quality"),
item.get("reaction"),
item.get("setting"),
item.get("status"),
item.get("velocity"),
)
for item in data
],
)
@staticmethod
def insert_links_batch_sync(conn: Connection, data: List[dict]):
@@ -54,10 +70,26 @@ class SchemeRepository:
velocity = EXCLUDED.velocity
"""
with conn.cursor() as cur:
cur.executemany(query, [
(item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity"))
for item in data
])
cur.executemany(
query,
[
(
item["time"],
item["scheme_type"],
item["scheme_name"],
item["id"],
item.get("flow"),
item.get("friction"),
item.get("headloss"),
item.get("quality"),
item.get("reaction"),
item.get("setting"),
item.get("status"),
item.get("velocity"),
)
for item in data
],
)
@staticmethod
async def get_link_by_scheme_and_time_range(
@@ -217,10 +249,22 @@ class SchemeRepository:
quality = EXCLUDED.quality
"""
async with conn.cursor() as cur:
await cur.executemany(query, [
(item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality"))
for item in data
])
await cur.executemany(
query,
[
(
item["time"],
item["scheme_type"],
item["scheme_name"],
item["id"],
item.get("actual_demand"),
item.get("total_head"),
item.get("pressure"),
item.get("quality"),
)
for item in data
],
)
@staticmethod
def insert_nodes_batch_sync(conn: Connection, data: List[dict]):
@@ -236,10 +280,22 @@ class SchemeRepository:
quality = EXCLUDED.quality
"""
with conn.cursor() as cur:
cur.executemany(query, [
(item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality"))
for item in data
])
cur.executemany(
query,
[
(
item["time"],
item["scheme_type"],
item["scheme_name"],
item["id"],
item.get("actual_demand"),
item.get("total_head"),
item.get("pressure"),
item.get("quality"),
)
for item in data
],
)
@staticmethod
async def get_node_by_scheme_and_time_range(
@@ -610,26 +666,25 @@ class SchemeRepository:
raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'")
# Format the results
results = [{"ID": item["id"], "value": item["value"]} for item in data]
return {"results": results}
return [{"ID": item["id"], "value": item["value"]} for item in data]
@staticmethod
async def query_scheme_simulation_result_by_ID_time(
async def query_scheme_simulation_result_by_id_time(
conn: AsyncConnection,
scheme_type: str,
scheme_name: str,
ID: str,
id: str,
type: str,
query_time: str,
) -> list[dict]:
"""
Query scheme simulation results by ID and time from TimescaleDB.
Query scheme simulation results by id and time from TimescaleDB.
Args:
conn: Database connection
scheme_type: Scheme type
scheme_name: Scheme name
ID: The ID of the node or link
id: The id of the node or link
type: Type of data ("node" or "link")
query_time: Time to query (ISO format string)
@@ -659,11 +714,11 @@ class SchemeRepository:
# Query based on type
if type.lower() == "node":
return await SchemeRepository.get_node_by_scheme_and_time_range(
conn, scheme_type, scheme_name, start_time, end_time, ID
conn, scheme_type, scheme_name, start_time, end_time, id
)
elif type.lower() == "link":
return await SchemeRepository.get_link_by_scheme_and_time_range(
conn, scheme_type, scheme_name, start_time, end_time, ID
conn, scheme_type, scheme_name, start_time, end_time, id
)
else:
raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'")