diff --git a/main.py b/main.py index 0eae34e..10eb2f1 100644 --- a/main.py +++ b/main.py @@ -112,7 +112,7 @@ async def verify_token(authorization: Annotated[str, Header()] = None): # 全局依赖项 # app = FastAPI(dependencies=[Depends(global_auth)]) -app = FastAPI() +# app = FastAPI() # 生命周期管理器 diff --git a/postgresql/database.py b/postgresql/database.py index 3616987..bf64648 100644 --- a/postgresql/database.py +++ b/postgresql/database.py @@ -21,14 +21,12 @@ class Database: try: self.pool = psycopg_pool.AsyncConnectionPool( conninfo=conn_string, - min_size=1, + min_size=5, max_size=20, open=False, # Don't open immediately, wait for startup kwargs={"row_factory": dict_row}, # Return rows as dictionaries ) - logger.info( - f"PostgreSQL connection pool initialized for database: 'default'" - ) + logger.info(f"PostgreSQL connection pool initialized for database: default") except Exception as e: logger.error(f"Failed to initialize postgresql connection pool: {e}") raise diff --git a/requirements.txt b/requirements.txt index cd35c03..0f77826 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/run_server.py b/run_server.py index 8610dc8..0e0d57e 100644 --- a/run_server.py +++ b/run_server.py @@ -14,7 +14,7 @@ if __name__ == "__main__": host="0.0.0.0", port=8000, loop="asyncio", # 强制使用 asyncio 事件循环 - # workers=4, # 如果需要多进程,可以取消注释 + workers=2, # 如果需要多进程,可以取消注释 ) server = uvicorn.Server(config) diff --git a/timescaledb/database.py b/timescaledb/database.py index 46c7420..448b6bf 100644 --- a/timescaledb/database.py +++ b/timescaledb/database.py @@ -21,7 +21,7 @@ class Database: try: self.pool = psycopg_pool.AsyncConnectionPool( conninfo=conn_string, - min_size=1, + min_size=5, max_size=20, open=False, # Don't open immediately, wait for startup kwargs={"row_factory": dict_row}, # Return rows as dictionaries diff --git a/timescaledb/schemas/realtime.py b/timescaledb/schemas/realtime.py index 202d6f8..3ef85d8 100644 --- a/timescaledb/schemas/realtime.py +++ b/timescaledb/schemas/realtime.py @@ -29,10 +29,24 @@ class RealtimeRepository: velocity = EXCLUDED.velocity """ async with conn.cursor() as cur: - await cur.executemany(query, [ - (item["time"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity")) - for item in data - ]) + await cur.executemany( + query, + [ + ( + item["time"], + item["id"], + item.get("flow"), + item.get("friction"), + item.get("headloss"), + item.get("quality"), + item.get("reaction"), + item.get("setting"), + item.get("status"), + item.get("velocity"), + ) + for item in data + ], + ) @staticmethod def insert_links_batch_sync(conn: Connection, data: List[dict]): @@ -53,10 +67,24 @@ class RealtimeRepository: velocity = EXCLUDED.velocity """ with conn.cursor() as cur: - cur.executemany(query, [ - (item["time"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity")) - for item in data - ]) + cur.executemany( + query, + [ + ( + item["time"], + item["id"], + item.get("flow"), + item.get("friction"), + item.get("headloss"), + item.get("quality"), + item.get("reaction"), + item.get("setting"), + item.get("status"), + item.get("velocity"), + ) + for item in data + ], + ) @staticmethod async def get_link_by_time_range( @@ -133,13 +161,13 @@ class RealtimeRepository: raise ValueError(f"Invalid field: {field}") query = sql.SQL( - "SELECT {} FROM realtime.link_simulation WHERE time >= %s AND time <= %s" + "SELECT id, {} FROM realtime.link_simulation WHERE time >= %s AND time <= %s" ).format(sql.Identifier(field)) async with conn.cursor() as cur: await cur.execute(query, (start_time, end_time)) - row = await cur.fetchone() - return row[field] if row else None + rows = await cur.fetchall() + return [{"id": row["id"], "value": row[field]} for row in rows] @staticmethod async def update_link_field( @@ -195,10 +223,20 @@ class RealtimeRepository: quality = EXCLUDED.quality """ async with conn.cursor() as cur: - await cur.executemany(query, [ - (item["time"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality")) - for item in data - ]) + await cur.executemany( + query, + [ + ( + item["time"], + item["id"], + item.get("actual_demand"), + item.get("total_head"), + item.get("pressure"), + item.get("quality"), + ) + for item in data + ], + ) @staticmethod def insert_nodes_batch_sync(conn: Connection, data: List[dict]): @@ -214,10 +252,20 @@ class RealtimeRepository: quality = EXCLUDED.quality """ with conn.cursor() as cur: - cur.executemany(query, [ - (item["time"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality")) - for item in data - ]) + cur.executemany( + query, + [ + ( + item["time"], + item["id"], + item.get("actual_demand"), + item.get("total_head"), + item.get("pressure"), + item.get("quality"), + ) + for item in data + ], + ) @staticmethod async def get_node_by_time_range( @@ -271,13 +319,13 @@ class RealtimeRepository: raise ValueError(f"Invalid field: {field}") query = sql.SQL( - "SELECT {} FROM realtime.node_simulation WHERE time >= %s AND time <= %s" + "SELECT id, {} FROM realtime.node_simulation WHERE time >= %s AND time <= %s" ).format(sql.Identifier(field)) async with conn.cursor() as cur: await cur.execute(query, (start_time, end_time)) - row = await cur.fetchone() - return row[field] if row else None + rows = await cur.fetchall() + return [{"id": row["id"], "value": row[field]} for row in rows] @staticmethod async def update_node_field( @@ -507,16 +555,20 @@ class RealtimeRepository: # Query based on type if type.lower() == "node": - return await RealtimeRepository.get_nodes_field_by_time_range( + data = await RealtimeRepository.get_nodes_field_by_time_range( conn, start_time, end_time, property ) elif type.lower() == "link": - return await RealtimeRepository.get_links_field_by_time_range( + data = await RealtimeRepository.get_links_field_by_time_range( conn, start_time, end_time, property ) else: raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'") + # Format the results + results = [{"ID": item["id"], "value": item["value"]} for item in data] + return {"results": results} + @staticmethod async def query_simulation_result_by_ID_time( conn: AsyncConnection, diff --git a/timescaledb/schemas/scheme.py b/timescaledb/schemas/scheme.py index 44060f5..b50a943 100644 --- a/timescaledb/schemas/scheme.py +++ b/timescaledb/schemas/scheme.py @@ -149,13 +149,13 @@ class SchemeRepository: raise ValueError(f"Invalid field: {field}") query = sql.SQL( - "SELECT {} FROM scheme.link_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s" + "SELECT id, {} FROM scheme.link_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s" ).format(sql.Identifier(field)) async with conn.cursor() as cur: await cur.execute(query, (scheme_type, scheme_name, start_time, end_time)) - row = await cur.fetchone() - return row[field] if row else None + rows = await cur.fetchall() + return [{"id": row["id"], "value": row[field]} for row in rows] @staticmethod async def update_link_field( @@ -313,13 +313,13 @@ class SchemeRepository: raise ValueError(f"Invalid field: {field}") query = sql.SQL( - "SELECT {} FROM scheme.node_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s" + "SELECT id, {} FROM scheme.node_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s" ).format(sql.Identifier(field)) async with conn.cursor() as cur: await cur.execute(query, (scheme_type, scheme_name, start_time, end_time)) - row = await cur.fetchone() - return row[field] if row else None + rows = await cur.fetchall() + return [{"id": row["id"], "value": row[field]} for row in rows] @staticmethod async def update_node_field( @@ -599,16 +599,20 @@ class SchemeRepository: # Query based on type if type.lower() == "node": - return await SchemeRepository.get_nodes_field_by_scheme_and_time_range( + data = await SchemeRepository.get_nodes_field_by_scheme_and_time_range( conn, scheme_type, scheme_name, start_time, end_time, property ) elif type.lower() == "link": - return await SchemeRepository.get_links_field_by_scheme_and_time_range( + data = await SchemeRepository.get_links_field_by_scheme_and_time_range( conn, scheme_type, scheme_name, start_time, end_time, property ) else: raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'") + # Format the results + results = [{"ID": item["id"], "value": item["value"]} for item in data] + return {"results": results} + @staticmethod async def query_scheme_simulation_result_by_ID_time( conn: AsyncConnection,