完成timescaledb获取在线模拟数据的方法

This commit is contained in:
JIANG
2025-12-09 17:11:20 +08:00
parent b6c1ff1878
commit 18fc564efc
7 changed files with 93 additions and 39 deletions

View File

@@ -112,7 +112,7 @@ async def verify_token(authorization: Annotated[str, Header()] = None):
# 全局依赖项
# app = FastAPI(dependencies=[Depends(global_auth)])
app = FastAPI()
# app = FastAPI()
# 生命周期管理器

View File

@@ -21,14 +21,12 @@ class Database:
try:
self.pool = psycopg_pool.AsyncConnectionPool(
conninfo=conn_string,
min_size=1,
min_size=5,
max_size=20,
open=False, # Don't open immediately, wait for startup
kwargs={"row_factory": dict_row}, # Return rows as dictionaries
)
logger.info(
f"PostgreSQL connection pool initialized for database: 'default'"
)
logger.info(f"PostgreSQL connection pool initialized for database: default")
except Exception as e:
logger.error(f"Failed to initialize postgresql connection pool: {e}")
raise

Binary file not shown.

View File

@@ -14,7 +14,7 @@ if __name__ == "__main__":
host="0.0.0.0",
port=8000,
loop="asyncio", # 强制使用 asyncio 事件循环
# workers=4, # 如果需要多进程,可以取消注释
workers=2, # 如果需要多进程,可以取消注释
)
server = uvicorn.Server(config)

View File

@@ -21,7 +21,7 @@ class Database:
try:
self.pool = psycopg_pool.AsyncConnectionPool(
conninfo=conn_string,
min_size=1,
min_size=5,
max_size=20,
open=False, # Don't open immediately, wait for startup
kwargs={"row_factory": dict_row}, # Return rows as dictionaries

View File

@@ -29,10 +29,24 @@ class RealtimeRepository:
velocity = EXCLUDED.velocity
"""
async with conn.cursor() as cur:
await cur.executemany(query, [
(item["time"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity"))
for item in data
])
await cur.executemany(
query,
[
(
item["time"],
item["id"],
item.get("flow"),
item.get("friction"),
item.get("headloss"),
item.get("quality"),
item.get("reaction"),
item.get("setting"),
item.get("status"),
item.get("velocity"),
)
for item in data
],
)
@staticmethod
def insert_links_batch_sync(conn: Connection, data: List[dict]):
@@ -53,10 +67,24 @@ class RealtimeRepository:
velocity = EXCLUDED.velocity
"""
with conn.cursor() as cur:
cur.executemany(query, [
(item["time"], item["id"], item.get("flow"), item.get("friction"), item.get("headloss"), item.get("quality"), item.get("reaction"), item.get("setting"), item.get("status"), item.get("velocity"))
for item in data
])
cur.executemany(
query,
[
(
item["time"],
item["id"],
item.get("flow"),
item.get("friction"),
item.get("headloss"),
item.get("quality"),
item.get("reaction"),
item.get("setting"),
item.get("status"),
item.get("velocity"),
)
for item in data
],
)
@staticmethod
async def get_link_by_time_range(
@@ -133,13 +161,13 @@ class RealtimeRepository:
raise ValueError(f"Invalid field: {field}")
query = sql.SQL(
"SELECT {} FROM realtime.link_simulation WHERE time >= %s AND time <= %s"
"SELECT id, {} FROM realtime.link_simulation WHERE time >= %s AND time <= %s"
).format(sql.Identifier(field))
async with conn.cursor() as cur:
await cur.execute(query, (start_time, end_time))
row = await cur.fetchone()
return row[field] if row else None
rows = await cur.fetchall()
return [{"id": row["id"], "value": row[field]} for row in rows]
@staticmethod
async def update_link_field(
@@ -195,10 +223,20 @@ class RealtimeRepository:
quality = EXCLUDED.quality
"""
async with conn.cursor() as cur:
await cur.executemany(query, [
(item["time"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality"))
for item in data
])
await cur.executemany(
query,
[
(
item["time"],
item["id"],
item.get("actual_demand"),
item.get("total_head"),
item.get("pressure"),
item.get("quality"),
)
for item in data
],
)
@staticmethod
def insert_nodes_batch_sync(conn: Connection, data: List[dict]):
@@ -214,10 +252,20 @@ class RealtimeRepository:
quality = EXCLUDED.quality
"""
with conn.cursor() as cur:
cur.executemany(query, [
(item["time"], item["id"], item.get("actual_demand"), item.get("total_head"), item.get("pressure"), item.get("quality"))
for item in data
])
cur.executemany(
query,
[
(
item["time"],
item["id"],
item.get("actual_demand"),
item.get("total_head"),
item.get("pressure"),
item.get("quality"),
)
for item in data
],
)
@staticmethod
async def get_node_by_time_range(
@@ -271,13 +319,13 @@ class RealtimeRepository:
raise ValueError(f"Invalid field: {field}")
query = sql.SQL(
"SELECT {} FROM realtime.node_simulation WHERE time >= %s AND time <= %s"
"SELECT id, {} FROM realtime.node_simulation WHERE time >= %s AND time <= %s"
).format(sql.Identifier(field))
async with conn.cursor() as cur:
await cur.execute(query, (start_time, end_time))
row = await cur.fetchone()
return row[field] if row else None
rows = await cur.fetchall()
return [{"id": row["id"], "value": row[field]} for row in rows]
@staticmethod
async def update_node_field(
@@ -507,16 +555,20 @@ class RealtimeRepository:
# Query based on type
if type.lower() == "node":
return await RealtimeRepository.get_nodes_field_by_time_range(
data = await RealtimeRepository.get_nodes_field_by_time_range(
conn, start_time, end_time, property
)
elif type.lower() == "link":
return await RealtimeRepository.get_links_field_by_time_range(
data = await RealtimeRepository.get_links_field_by_time_range(
conn, start_time, end_time, property
)
else:
raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'")
# Format the results
results = [{"ID": item["id"], "value": item["value"]} for item in data]
return {"results": results}
@staticmethod
async def query_simulation_result_by_ID_time(
conn: AsyncConnection,

View File

@@ -149,13 +149,13 @@ class SchemeRepository:
raise ValueError(f"Invalid field: {field}")
query = sql.SQL(
"SELECT {} FROM scheme.link_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s"
"SELECT id, {} FROM scheme.link_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s"
).format(sql.Identifier(field))
async with conn.cursor() as cur:
await cur.execute(query, (scheme_type, scheme_name, start_time, end_time))
row = await cur.fetchone()
return row[field] if row else None
rows = await cur.fetchall()
return [{"id": row["id"], "value": row[field]} for row in rows]
@staticmethod
async def update_link_field(
@@ -313,13 +313,13 @@ class SchemeRepository:
raise ValueError(f"Invalid field: {field}")
query = sql.SQL(
"SELECT {} FROM scheme.node_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s"
"SELECT id, {} FROM scheme.node_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s"
).format(sql.Identifier(field))
async with conn.cursor() as cur:
await cur.execute(query, (scheme_type, scheme_name, start_time, end_time))
row = await cur.fetchone()
return row[field] if row else None
rows = await cur.fetchall()
return [{"id": row["id"], "value": row[field]} for row in rows]
@staticmethod
async def update_node_field(
@@ -599,16 +599,20 @@ class SchemeRepository:
# Query based on type
if type.lower() == "node":
return await SchemeRepository.get_nodes_field_by_scheme_and_time_range(
data = await SchemeRepository.get_nodes_field_by_scheme_and_time_range(
conn, scheme_type, scheme_name, start_time, end_time, property
)
elif type.lower() == "link":
return await SchemeRepository.get_links_field_by_scheme_and_time_range(
data = await SchemeRepository.get_links_field_by_scheme_and_time_range(
conn, scheme_type, scheme_name, start_time, end_time, property
)
else:
raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'")
# Format the results
results = [{"ID": item["id"], "value": item["value"]} for item in data]
return {"results": results}
@staticmethod
async def query_scheme_simulation_result_by_ID_time(
conn: AsyncConnection,