新增节点、管道计算值按时间、属性查询的 API;新增方案中节点、管道按时间、属性、方案名查询的 API;新增计算值按时间、方案查询所有计算值;强制模拟计算方法新增 开始时间和持续时间 参数,并返回计算状态

This commit is contained in:
JIANG
2025-10-31 17:09:44 +08:00
parent cb60a6e286
commit cb7ff006a2

131
main.py
View File

@@ -64,7 +64,7 @@ LINK_COUNT = 2
prjs = [] prjs = []
inpDir = "C:/inpfiles/" inpDir = "C:/inpfiles/"
tmpDir = "C:/tmpfiles/" tmpDir = "C:/tmpfiles/"
proj_name = project_info.name
lockedPrjs = {} lockedPrjs = {}
if not os.path.exists(inpDir): if not os.path.exists(inpDir):
@@ -191,8 +191,9 @@ async def startup_db():
logger.info("TJWater CloudService is starting...") logger.info("TJWater CloudService is starting...")
logger.info("**********************************************************") logger.info("**********************************************************")
# open 'szh' by default # open proj_name by default
open_project("szh") print(project_info.name)
open_project(project_info.name)
############################################################ ############################################################
@@ -2835,6 +2836,69 @@ async def fastapi_query_all_records_by_time(querytime: str) -> dict[str, list]:
return {"nodes": results[0], "links": results[1]} return {"nodes": results[0], "links": results[1]}
# def query_all_record_by_time_property(querytime: str, type: str, property: str, bucket: str = "realtime_simulation_result") -> tuple:
@app.get("/queryallrecordsbytimeproperty/")
async def fastapi_query_all_record_by_time_property(
querytime: str, type: str, property: str, bucket: str = "realtime_simulation_result"
) -> dict[str, list]:
results: tuple = influxdb_api.query_all_record_by_time_property(
query_time=querytime, type=type, property=property, bucket=bucket
)
return {"results": results}
@app.get("/queryallschemerecordsbytimeproperty/")
async def fastapi_query_all_scheme_record_by_time_property(
querytime: str,
type: str,
property: str,
schemename: str,
bucket: str = "scheme_simulation_result",
) -> dict[str, list]:
"""
查询指定方案某一时刻的所有记录,查询 'node''link' 的某一属性值
:param querytime: 查询时间,格式为 '2024-11-24T17:30:00+08:00'
:param type: 查询类型 'node''link'
:param property: 查询的属性字段名
:param schemename: 方案名称,如 "FANGAN1761124840355"
:param bucket: 数据存储的bucket名称
:return: 包含查询结果的字典
"""
results: list = influxdb_api.query_all_scheme_record_by_time_property(
query_time=querytime,
type=type,
property=property,
scheme_name=schemename,
bucket=bucket,
)
return {"results": results}
@app.get("/querysimulationrecordsbyidtime/")
async def fastapi_query_simulation_record_by_ids_time(
id: str, querytime: str, type: str, bucket: str = "realtime_simulation_result"
) -> dict[str, list]:
results: tuple = influxdb_api.query_simulation_result_by_ID_time(
ID=id, type=type, query_time=querytime, bucket=bucket
)
return {"results": results}
@app.get("/queryschemesimulationrecordsbyidtime/")
async def fastapi_query_scheme_simulation_record_by_ids_time(
scheme_name: str,
id: str,
querytime: str,
type: str,
bucket: str = "scheme_simulation_result",
) -> dict[str, list]:
results: tuple = influxdb_api.query_scheme_simulation_result_by_ID_time(
scheme_name=scheme_name, ID=id, type=type, query_time=querytime, bucket=bucket
)
return {"results": results}
@app.get("/queryallrecordsbydate/") @app.get("/queryallrecordsbydate/")
async def fastapi_query_all_records_by_date(querydate: str) -> dict: async def fastapi_query_all_records_by_date(querydate: str) -> dict:
# 缓存查询结果提高性能 # 缓存查询结果提高性能
@@ -3004,7 +3068,7 @@ async def fastapi_query_scada_data_by_device_id_and_time(ids: str, querytime: st
# 2025/05/04 DingZQ # 2025/05/04 DingZQ
# 对于SCADA的曲线数据我们需要有4 套数据值 # 对于SCAD的曲线数据我们需要有4 套数据值
# 1. 原始数据 # 1. 原始数据
# 2. 补充的数据 (补充前面第一步缺失的数据) # 2. 补充的数据 (补充前面第一步缺失的数据)
# 3. 清洗后的数据点 (用五角星表示) # 3. 清洗后的数据点 (用五角星表示)
@@ -3288,25 +3352,36 @@ async def fastapi_download_history_data_manually(
# DingZQ, 2025-05-17 # DingZQ, 2025-05-17
# 新增开始时间和持续时间参数
class Run_Simulation_Manually_by_Date(BaseModel): class Run_Simulation_Manually_by_Date(BaseModel):
""" """
name数据库名称 name数据库名称
simulation_date样式如 2025-05-04 simulation_date样式如 2025-05-04
start_time开始时间样式如 08:00:00
duration持续时间单位为分钟
""" """
name: str name: str
simulation_date: str simulation_date: str
start_time: str
duration: int
def run_simulation_manually_by_date(network_name: str, base_date: datetime) -> None: def run_simulation_manually_by_date(
# 循环生成96个时间点15分钟间隔 network_name: str, base_date: datetime, start_time: str, duration: int
for i in range(96): ) -> None:
# 计算当前时间偏移 # 解析开始时间
time_offset = timedelta(minutes=15 * i) start_hour, start_minute, start_second = map(int, start_time.split(":"))
start_datetime = base_date.replace(
hour=start_hour, minute=start_minute, second=start_second
)
# 生成完整时间对象 # 计算结束时间
current_time = base_date + time_offset end_datetime = start_datetime + timedelta(minutes=duration)
# 生成时间点每15分钟一个
current_time = start_datetime
while current_time < end_datetime:
# 格式化成ISO8601带时区格式 # 格式化成ISO8601带时区格式
iso_time = current_time.strftime("%Y-%m-%dT%H:%M:%S") + "+08:00" iso_time = current_time.strftime("%Y-%m-%dT%H:%M:%S") + "+08:00"
@@ -3317,11 +3392,14 @@ def run_simulation_manually_by_date(network_name: str, base_date: datetime) -> N
modify_pattern_start_time=iso_time, modify_pattern_start_time=iso_time,
) )
# 增加15分钟
current_time += timedelta(minutes=15)
@app.post("/runsimulationmanuallybydate/") @app.post("/runsimulationmanuallybydate/")
async def fastapi_run_simulation_manually_by_date( async def fastapi_run_simulation_manually_by_date(
data: Run_Simulation_Manually_by_Date, data: Run_Simulation_Manually_by_Date,
) -> None: ) -> dict[str, str]:
item = data.dict() item = data.dict()
print(f"item: {item}") print(f"item: {item}")
@@ -3332,6 +3410,7 @@ async def fastapi_run_simulation_manually_by_date(
raise HTTPException(status_code=409, detail="is in simulation") raise HTTPException(status_code=409, detail="is in simulation")
else: else:
print("file doesnt exists") print("file doesnt exists")
try:
simulation.query_corresponding_element_id_and_query_id(item["name"]) simulation.query_corresponding_element_id_and_query_id(item["name"])
simulation.query_corresponding_pattern_id_and_query_id(item["name"]) simulation.query_corresponding_pattern_id_and_query_id(item["name"])
region_result = simulation.query_non_realtime_region(item["name"]) region_result = simulation.query_non_realtime_region(item["name"])
@@ -3344,12 +3423,14 @@ async def fastapi_run_simulation_manually_by_date(
item["name"], region_result item["name"], region_result
) )
) )
globals.pipe_flow_region_patterns = simulation.query_pipe_flow_region_patterns( globals.pipe_flow_region_patterns = (
item["name"] simulation.query_pipe_flow_region_patterns(item["name"])
) )
globals.non_realtime_region_patterns = ( globals.non_realtime_region_patterns = (
simulation.query_non_realtime_region_patterns(item["name"], region_result) simulation.query_non_realtime_region_patterns(
item["name"], region_result
)
) )
( (
globals.source_outflow_region_patterns, globals.source_outflow_region_patterns,
@@ -3363,9 +3444,16 @@ async def fastapi_run_simulation_manually_by_date(
base_date = datetime.strptime(item["simulation_date"], "%Y-%m-%d") base_date = datetime.strptime(item["simulation_date"], "%Y-%m-%d")
thread = threading.Thread( thread = threading.Thread(
target=lambda: run_simulation_manually_by_date(item["name"], base_date) target=lambda: run_simulation_manually_by_date(
item["name"], base_date, item["start_time"], item["duration"]
)
) )
thread.start() thread.start()
thread.join() # 等待线程完成
return {"status": "success"}
except Exception as e:
return {"status": "error", "message": str(e)}
# thread.join() # thread.join()
# DingZQ 08152025 # DingZQ 08152025
@@ -3941,7 +4029,10 @@ async def get_dict(item: Item):
if __name__ == "__main__": if __name__ == "__main__":
# uvicorn.run(app, host="0.0.0.0", port=8000) # uvicorn.run(app, host="0.0.0.0", port=8000)
# url='http://127.0.0.1:8000/valve_close_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&valve_IDs=GSD2307192058577780A3287D78&valve_IDs=GSD2307192058572E953B707226(S2)&duration=1800' # url='http://127.0.0.1:8000/valve_close_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&valve_IDs=GSD2307192058577780A3287D78&valve_IDs=GSD2307192058572E953B707226(S2)&duration=1800'
url = "http://127.0.0.1:8000/burst_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&burst_ID=ZBBGXSZW000001&duration=1800" # url='http://127.0.0.1:8000/burst_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&burst_ID=ZBBGXSZW000001&duration=1800'
Request.get( # url = "http://192.168.1.36:8000/queryallschemeallrecords/?schemename=Fangan0817114448&querydate=2025-08-13&schemetype=burst_Analysis"
url, # response = Request.get(url)
)
# import requests
# response = requests.get(url)