新增清洗数据 API
This commit is contained in:
@@ -2004,6 +2004,66 @@ def query_cleaning_SCADA_data_by_device_ID_and_timerange(
|
|||||||
|
|
||||||
return SCADA_dict
|
return SCADA_dict
|
||||||
|
|
||||||
|
# 查找 SCADA 模拟数据,返回的是一个list,list的内容是清洗后的正常值,表示为 time + value
|
||||||
|
def query_simulation_SCADA_data_by_device_ID_and_timerange(
|
||||||
|
query_ids_list: List[str],
|
||||||
|
start_time: str,
|
||||||
|
end_time: str,
|
||||||
|
bucket: str = "SCADA_data",
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
查询指定时间范围内,多个SCADA设备的修复的单个的数据
|
||||||
|
:param query_ids_list: SCADA设备ID的列表
|
||||||
|
:param start_time: 输入的北京时间,格式为 '2023-11-24T17:30:00+08:00'。
|
||||||
|
:param end_time: 输入的北京时间,格式为 '2023-11-24T17:30:00+08:00'。
|
||||||
|
:param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
client = get_new_client()
|
||||||
|
if not client.ping():
|
||||||
|
print(
|
||||||
|
"{} -- Failed to connect to InfluxDB.".format(
|
||||||
|
datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
query_api = client.query_api()
|
||||||
|
print("start_time", start_time)
|
||||||
|
print("end_time", end_time)
|
||||||
|
# 将北京时间转换为 UTC 时间
|
||||||
|
beijing_start_time = datetime.fromisoformat(start_time)
|
||||||
|
print("beijing_start_time", beijing_start_time)
|
||||||
|
utc_start_time = time_api.to_utc_time(beijing_start_time)
|
||||||
|
print("utc_start_time", utc_start_time)
|
||||||
|
beijing_end_time = datetime.fromisoformat(end_time)
|
||||||
|
print("beijing_end_time", beijing_end_time)
|
||||||
|
utc_stop_time = time_api.to_utc_time(beijing_end_time)
|
||||||
|
print("utc_stop_time", utc_stop_time)
|
||||||
|
|
||||||
|
SCADA_dict = {}
|
||||||
|
for device_id in query_ids_list:
|
||||||
|
flux_query = f"""
|
||||||
|
from(bucket: "{bucket}")
|
||||||
|
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|
||||||
|
|> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "datacleaning_value")
|
||||||
|
|> sort(columns: ["_time"])
|
||||||
|
"""
|
||||||
|
# 执行查询,返回一个 FluxTable 列表
|
||||||
|
tables = query_api.query(flux_query)
|
||||||
|
print(tables)
|
||||||
|
records_list = []
|
||||||
|
for table in tables:
|
||||||
|
for record in table.records:
|
||||||
|
# 获取记录的时间和监测值
|
||||||
|
records_list.append(
|
||||||
|
{"time": record["_time"], "value": record["_value"]}
|
||||||
|
)
|
||||||
|
SCADA_dict[device_id] = records_list
|
||||||
|
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
return SCADA_dict
|
||||||
|
|
||||||
|
|
||||||
# 2025/05/04 DingZQ
|
# 2025/05/04 DingZQ
|
||||||
# SCADA 数据原版缺失,根据历史数据的平均值补上缺失的部分
|
# SCADA 数据原版缺失,根据历史数据的平均值补上缺失的部分
|
||||||
|
|||||||
34
main.py
34
main.py
@@ -3120,6 +3120,18 @@ async def fastapi_query_cleaning_scada_data_by_device_id_and_time_range(
|
|||||||
return influxdb_api.query_cleaning_SCADA_data_by_device_ID_and_timerange(
|
return influxdb_api.query_cleaning_SCADA_data_by_device_ID_and_timerange(
|
||||||
query_ids_list=query_ids, start_time=starttime, end_time=endtime
|
query_ids_list=query_ids, start_time=starttime, end_time=endtime
|
||||||
)
|
)
|
||||||
|
# 查询到的SCADA模拟数据(从 realtime_simulation bucket 中查找)
|
||||||
|
@app.get("/querysimulationscadadatabydeviceidandtimerange/")
|
||||||
|
async def fastapi_query_simulation_scada_data_by_device_id_and_time_range(
|
||||||
|
ids: str, starttime: str, endtime: str
|
||||||
|
):
|
||||||
|
|
||||||
|
print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}")
|
||||||
|
|
||||||
|
query_ids = ids.split(",")
|
||||||
|
return influxdb_api.query_simulation_SCADA_data_by_device_ID_and_timerange(
|
||||||
|
query_ids_list=query_ids, start_time=starttime, end_time=endtime
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# 查询指定时间范围内,多个SCADA设备的清洗后的数据
|
# 查询指定时间范围内,多个SCADA设备的清洗后的数据
|
||||||
@@ -4156,17 +4168,17 @@ async def fastapi_scada_device_data_cleaning(
|
|||||||
|
|
||||||
# 添加 time 列到首列
|
# 添加 time 列到首列
|
||||||
cleaned_value_df = pd.DataFrame(cleaned_value_df)
|
cleaned_value_df = pd.DataFrame(cleaned_value_df)
|
||||||
# 只选择以 '_cleaned' 结尾的清洗数据列
|
# # 只选择以 '_cleaned' 结尾的清洗数据列
|
||||||
cleaned_columns = [
|
# cleaned_columns = [
|
||||||
col for col in cleaned_value_df.columns if col.endswith("_cleaned")
|
# col for col in cleaned_value_df.columns if col.endswith("_cleaned")
|
||||||
]
|
# ]
|
||||||
cleaned_value_df = cleaned_value_df[cleaned_columns]
|
# cleaned_value_df = cleaned_value_df[cleaned_columns]
|
||||||
# 重命名列,移除 '_cleaned' 后缀
|
# # 重命名列,移除 '_cleaned' 后缀
|
||||||
cleaned_value_df = cleaned_value_df.rename(
|
# cleaned_value_df = cleaned_value_df.rename(
|
||||||
columns={
|
# columns={
|
||||||
col: col.replace("_cleaned", "") for col in cleaned_value_df.columns
|
# col: col.replace("_cleaned", "") for col in cleaned_value_df.columns
|
||||||
}
|
# }
|
||||||
)
|
# )
|
||||||
cleaned_df = pd.concat([df["time"], cleaned_value_df], axis=1)
|
cleaned_df = pd.concat([df["time"], cleaned_value_df], axis=1)
|
||||||
|
|
||||||
# 调试输出,确认列名
|
# 调试输出,确认列名
|
||||||
|
|||||||
Reference in New Issue
Block a user