From 77cc7236fca0b8c69fc55a9d0885ed06a16481d7 Mon Sep 17 00:00:00 2001 From: JIANG Date: Tue, 9 Dec 2025 17:37:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90=E5=9C=A8=E7=BA=BF=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=B1=9E=E6=80=A7=E8=8E=B7=E5=8F=96=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- timescaledb/router.py | 24 ++++---- timescaledb/schemas/realtime.py | 15 +++-- timescaledb/schemas/scheme.py | 103 ++++++++++++++++++++++++-------- 3 files changed, 100 insertions(+), 42 deletions(-) diff --git a/timescaledb/router.py b/timescaledb/router.py index ad99abc..4f396ba 100644 --- a/timescaledb/router.py +++ b/timescaledb/router.py @@ -132,25 +132,27 @@ async def query_realtime_records_by_time_property( ): """Query all realtime records by time and property""" try: - return await RealtimeRepository.query_all_record_by_time_property( + results = await RealtimeRepository.query_all_record_by_time_property( conn, query_time, type, property ) + return {"results": results} except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @router.get("/realtime/query/by-id-time") async def query_realtime_simulation_by_id_time( - ID: str, + id: str, type: str, query_time: str, conn: AsyncConnection = Depends(get_database_connection), ): - """Query realtime simulation results by ID and time""" + """Query realtime simulation results by id and time""" try: - return await RealtimeRepository.query_simulation_result_by_ID_time( - conn, ID, type, query_time + results = await RealtimeRepository.query_simulation_result_by_id_time( + conn, id, type, query_time ) + return {"results": results} except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @@ -321,9 +323,10 @@ async def query_scheme_records_by_scheme_time_property( ): """Query all scheme records by scheme, time and property""" try: - return await SchemeRepository.query_all_record_by_scheme_time_property( + results = await SchemeRepository.query_all_record_by_scheme_time_property( conn, scheme_type, scheme_name, query_time, type, property ) + return {"results": results} except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @@ -332,16 +335,17 @@ async def query_scheme_records_by_scheme_time_property( async def query_scheme_simulation_by_id_time( scheme_type: str, scheme_name: str, - ID: str, + id: str, type: str, query_time: str, conn: AsyncConnection = Depends(get_database_connection), ): - """Query scheme simulation results by ID and time""" + """Query scheme simulation results by id and time""" try: - return await SchemeRepository.query_scheme_simulation_result_by_ID_time( - conn, scheme_type, scheme_name, ID, type, query_time + result = await SchemeRepository.query_scheme_simulation_result_by_id_time( + conn, scheme_type, scheme_name, id, type, query_time ) + return {"result": result} except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) diff --git a/timescaledb/schemas/realtime.py b/timescaledb/schemas/realtime.py index 3ef85d8..83e66f0 100644 --- a/timescaledb/schemas/realtime.py +++ b/timescaledb/schemas/realtime.py @@ -566,22 +566,21 @@ class RealtimeRepository: raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'") # Format the results - results = [{"ID": item["id"], "value": item["value"]} for item in data] - return {"results": results} + return [{"ID": item["id"], "value": item["value"]} for item in data] @staticmethod - async def query_simulation_result_by_ID_time( + async def query_simulation_result_by_id_time( conn: AsyncConnection, - ID: str, + id: str, type: str, query_time: str, ) -> list[dict]: """ - Query simulation results by ID and time from TimescaleDB. + Query simulation results by id and time from TimescaleDB. Args: conn: Database connection - ID: The ID of the node or link + id: The id of the node or link type: Type of data ("node" or "link") query_time: Time to query (ISO format string) @@ -611,11 +610,11 @@ class RealtimeRepository: # Query based on type if type.lower() == "node": return await RealtimeRepository.get_node_by_time_range( - conn, start_time, end_time, ID + conn, start_time, end_time, id ) elif type.lower() == "link": return await RealtimeRepository.get_link_by_time_range( - conn, start_time, end_time, ID + conn, start_time, end_time, id ) else: raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'") diff --git a/timescaledb/schemas/scheme.py b/timescaledb/schemas/scheme.py index b50a943..044ab38 100644 --- a/timescaledb/schemas/scheme.py +++ b/timescaledb/schemas/scheme.py @@ -30,10 +30,26 @@ class SchemeRepository: velocity = EXCLUDED.velocity """ async with conn.cursor() as cur: - await cur.executemany(query, [ - (item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity")) - for item in data - ]) + await cur.executemany( + query, + [ + ( + item["time"], + item["scheme_type"], + item["scheme_name"], + item["id"], + item.get("flow"), + item.get("friction"), + item.get("headloss"), + item.get("quality"), + item.get("reaction"), + item.get("setting"), + item.get("status"), + item.get("velocity"), + ) + for item in data + ], + ) @staticmethod def insert_links_batch_sync(conn: Connection, data: List[dict]): @@ -54,10 +70,26 @@ class SchemeRepository: velocity = EXCLUDED.velocity """ with conn.cursor() as cur: - cur.executemany(query, [ - (item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity")) - for item in data - ]) + cur.executemany( + query, + [ + ( + item["time"], + item["scheme_type"], + item["scheme_name"], + item["id"], + item.get("flow"), + item.get("friction"), + item.get("headloss"), + item.get("quality"), + item.get("reaction"), + item.get("setting"), + item.get("status"), + item.get("velocity"), + ) + for item in data + ], + ) @staticmethod async def get_link_by_scheme_and_time_range( @@ -217,10 +249,22 @@ class SchemeRepository: quality = EXCLUDED.quality """ async with conn.cursor() as cur: - await cur.executemany(query, [ - (item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality")) - for item in data - ]) + await cur.executemany( + query, + [ + ( + item["time"], + item["scheme_type"], + item["scheme_name"], + item["id"], + item.get("actual_demand"), + item.get("total_head"), + item.get("pressure"), + item.get("quality"), + ) + for item in data + ], + ) @staticmethod def insert_nodes_batch_sync(conn: Connection, data: List[dict]): @@ -236,10 +280,22 @@ class SchemeRepository: quality = EXCLUDED.quality """ with conn.cursor() as cur: - cur.executemany(query, [ - (item["time"], item["scheme_type"], item["scheme_name"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality")) - for item in data - ]) + cur.executemany( + query, + [ + ( + item["time"], + item["scheme_type"], + item["scheme_name"], + item["id"], + item.get("actual_demand"), + item.get("total_head"), + item.get("pressure"), + item.get("quality"), + ) + for item in data + ], + ) @staticmethod async def get_node_by_scheme_and_time_range( @@ -610,26 +666,25 @@ class SchemeRepository: raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'") # Format the results - results = [{"ID": item["id"], "value": item["value"]} for item in data] - return {"results": results} + return [{"ID": item["id"], "value": item["value"]} for item in data] @staticmethod - async def query_scheme_simulation_result_by_ID_time( + async def query_scheme_simulation_result_by_id_time( conn: AsyncConnection, scheme_type: str, scheme_name: str, - ID: str, + id: str, type: str, query_time: str, ) -> list[dict]: """ - Query scheme simulation results by ID and time from TimescaleDB. + Query scheme simulation results by id and time from TimescaleDB. Args: conn: Database connection scheme_type: Scheme type scheme_name: Scheme name - ID: The ID of the node or link + id: The id of the node or link type: Type of data ("node" or "link") query_time: Time to query (ISO format string) @@ -659,11 +714,11 @@ class SchemeRepository: # Query based on type if type.lower() == "node": return await SchemeRepository.get_node_by_scheme_and_time_range( - conn, scheme_type, scheme_name, start_time, end_time, ID + conn, scheme_type, scheme_name, start_time, end_time, id ) elif type.lower() == "link": return await SchemeRepository.get_link_by_scheme_and_time_range( - conn, scheme_type, scheme_name, start_time, end_time, ID + conn, scheme_type, scheme_name, start_time, end_time, id ) else: raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'")