Files
TJWaterServerBinary/app/api/v1/endpoints/timeseries/realtime.py
T
2026-03-26 16:09:17 +08:00

260 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import APIRouter, Depends, HTTPException, Query, Path, Body
from typing import List
from datetime import datetime
from psycopg import AsyncConnection
from app.infra.db.timescaledb.repositories.realtime import RealtimeRepository
from .dependencies import get_timescale_connection
router = APIRouter()
@router.post("/realtime/links/batch", status_code=201, summary="批量插入实时管道数据")
async def insert_realtime_links(
data: List[dict] = Body(..., description="管道数据列表,每项包含管道ID、时间戳等信息"),
conn: AsyncConnection = Depends(get_timescale_connection)
):
"""
批量插入实时管道数据
将管道的实时监测数据批量插入时间序列数据库。
Args:
data: 管道数据列表
Returns:
插入成功的记录数
"""
await RealtimeRepository.insert_links_batch(conn, data)
return {"message": f"Inserted {len(data)} records"}
@router.get("/realtime/links", summary="查询实时管道数据")
async def get_realtime_links(
start_time: datetime = Query(..., description="查询开始时间"),
end_time: datetime = Query(..., description="查询结束时间"),
conn: AsyncConnection = Depends(get_timescale_connection),
):
"""
查询指定时间范围内的实时管道数据
根据时间范围查询所有实时管道的监测值。
Args:
start_time: 查询开始时间
end_time: 查询结束时间
Returns:
实时管道数据列表
"""
return await RealtimeRepository.get_links_by_time_range(conn, start_time, end_time)
@router.delete("/realtime/links", summary="删除实时管道数据")
async def delete_realtime_links(
start_time: datetime = Query(..., description="删除开始时间"),
end_time: datetime = Query(..., description="删除结束时间"),
conn: AsyncConnection = Depends(get_timescale_connection),
):
"""
删除指定时间范围内的实时管道数据
删除在指定时间范围内的所有实时管道监测数据。
Args:
start_time: 删除开始时间
end_time: 删除结束时间
Returns:
删除结果信息
"""
await RealtimeRepository.delete_links_by_time_range(conn, start_time, end_time)
return {"message": "Deleted successfully"}
@router.patch("/realtime/links/{link_id}/field", summary="更新实时管道字段")
async def update_realtime_link_field(
link_id: str = Path(..., description="管道ID"),
time: datetime = Query(..., description="更新数据的时间戳"),
field: str = Query(..., description="要更新的字段名称"),
value: float = Query(..., description="更新的字段值"),
conn: AsyncConnection = Depends(get_timescale_connection),
):
"""
更新指定管道的字段值
更新实时管道在特定时间的某个字段数据。
Args:
link_id: 管道ID
time: 数据时间戳
field: 字段名称
value: 字段新值
Returns:
更新结果信息
Raises:
HTTPException: 当字段不存在或更新失败时返回400错误
"""
try:
await RealtimeRepository.update_link_field(conn, time, link_id, field, value)
return {"message": "Updated successfully"}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/realtime/nodes/batch", status_code=201, summary="批量插入实时节点数据")
async def insert_realtime_nodes(
data: List[dict] = Body(..., description="节点数据列表,每项包含节点ID、时间戳等信息"),
conn: AsyncConnection = Depends(get_timescale_connection)
):
"""
批量插入实时节点数据
将节点的实时监测数据批量插入时间序列数据库。
Args:
data: 节点数据列表
Returns:
插入成功的记录数
"""
await RealtimeRepository.insert_nodes_batch(conn, data)
return {"message": f"Inserted {len(data)} records"}
@router.get("/realtime/nodes", summary="查询实时节点数据")
async def get_realtime_nodes(
start_time: datetime = Query(..., description="查询开始时间"),
end_time: datetime = Query(..., description="查询结束时间"),
conn: AsyncConnection = Depends(get_timescale_connection),
):
"""
查询指定时间范围内的实时节点数据
根据时间范围查询所有实时节点的监测值。
Args:
start_time: 查询开始时间
end_time: 查询结束时间
Returns:
实时节点数据列表
"""
return await RealtimeRepository.get_nodes_by_time_range(conn, start_time, end_time)
@router.delete("/realtime/nodes", summary="删除实时节点数据")
async def delete_realtime_nodes(
start_time: datetime = Query(..., description="删除开始时间"),
end_time: datetime = Query(..., description="删除结束时间"),
conn: AsyncConnection = Depends(get_timescale_connection),
):
"""
删除指定时间范围内的实时节点数据
删除在指定时间范围内的所有实时节点监测数据。
Args:
start_time: 删除开始时间
end_time: 删除结束时间
Returns:
删除结果信息
"""
await RealtimeRepository.delete_nodes_by_time_range(conn, start_time, end_time)
return {"message": "Deleted successfully"}
@router.post("/realtime/simulation/store", status_code=201, summary="存储实时模拟结果")
async def store_realtime_simulation_result(
node_result_list: List[dict] = Body(..., description="节点模拟结果列表"),
link_result_list: List[dict] = Body(..., description="管道模拟结果列表"),
result_start_time: str = Query(..., description="模拟结果开始时间"),
conn: AsyncConnection = Depends(get_timescale_connection),
):
"""
存储实时模拟结果到时间序列数据库
将节点和管道的实时模拟计算结果批量存储到TimescaleDB数据库。
Args:
node_result_list: 节点模拟结果列表
link_result_list: 管道模拟结果列表
result_start_time: 模拟结果对应的起始时间
Returns:
存储结果信息
"""
await RealtimeRepository.store_realtime_simulation_result(
conn, node_result_list, link_result_list, result_start_time
)
return {"message": "Simulation results stored successfully"}
@router.get("/realtime/query/by-time-property", summary="按时间和属性查询实时数据")
async def query_realtime_records_by_time_property(
query_time: str = Query(..., description="查询时间"),
type: str = Query(..., description="数据类型,pipe(管道)或 junction(节点)"),
property: str = Query(..., description="要查询的属性名称"),
conn: AsyncConnection = Depends(get_timescale_connection),
):
"""
按指定时间和属性查询所有实时监测数据
查询在特定时间点,所有指定类型元素的特定属性值。
Args:
query_time: 查询时间
type: 元素类型(pipe或junction
property: 属性名称
Returns:
查询结果列表
Raises:
HTTPException: 当查询参数无效时返回400错误
"""
try:
results = await RealtimeRepository.query_all_record_by_time_property(
conn, query_time, type, property
)
return {"results": results}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/realtime/query/by-id-time", summary="按ID和时间查询实时模拟数据")
async def query_realtime_simulation_by_id_time(
id: str = Query(..., description="元素ID(管道ID或节点ID"),
type: str = Query(..., description="元素类型,pipe(管道)或 junction(节点)"),
query_time: str = Query(..., description="查询时间"),
conn: AsyncConnection = Depends(get_timescale_connection),
):
"""
按指定ID和时间查询实时模拟结果
查询特定元素在某一时间点的实时模拟数据。
Args:
id: 元素ID
type: 元素类型(pipe或junction
query_time: 查询时间
Returns:
模拟结果数据
Raises:
HTTPException: 当查询参数无效时返回400错误
"""
try:
results = await RealtimeRepository.query_simulation_result_by_id_time(
conn, id, type, query_time
)
return {"result": results}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))