新增 SCADA 设备清洗 API;新增调试配置文件

This commit is contained in:
JIANG
2025-11-05 15:23:46 +08:00
parent 004e76572a
commit ac653fa2c1
3 changed files with 307 additions and 47 deletions

23
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,23 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "debugpy",
"request": "launch",
"name": "Debug Uvicorn",
"module": "uvicorn",
"args": [
"main:app",
"--host",
"0.0.0.0",
"--port",
"8000",
"--workers",
"1"
]
}
]
}

View File

@@ -7,6 +7,7 @@ from influxdb_client import (
QueryApi,
WriteOptions,
DeleteApi,
WritePrecision,
)
from typing import List, Dict
from datetime import datetime, timedelta, timezone
@@ -191,6 +192,46 @@ def query_pg_scada_info_non_realtime(name: str) -> None:
print(f"查询时发生错误:{e}")
def query_pg_scada_info(name: str) -> list[dict]:
"""
查询pg数据库中,scada_info 的所有记录
:param name: 数据库名称
:return: 包含所有记录的列表,每条记录为一个字典
"""
# 连接数据库
conn_string = f"dbname={name} host=127.0.0.1"
records_list = []
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 查询 scada_info 表的所有记录
cur.execute(
"""
SELECT id, type, transmission_mode, transmission_frequency, reliability
FROM public.scada_info;
"""
)
records = cur.fetchall()
# 将查询结果转换为字典列表
for record in records:
record_dict = {
"id": record[0],
"type": record[1],
"transmission_mode": record[2],
"transmission_frequency": record[3],
"reliability": record[4],
}
records_list.append(record_dict)
except Exception as e:
print(f"查询时发生错误:{e}")
return []
return records_list
# 2025/03/23
def get_new_client() -> InfluxDBClient:
"""每次调用返回一个新的 InfluxDBClient 实例。"""
@@ -4478,9 +4519,13 @@ def delete_data(delete_date: str, bucket: str) -> None:
predicate = f'date="{delete_date}"'
delete_api: DeleteApi = client.delete_api()
delete_api.delete(start=start_time, stop=stop_time, predicate=predicate, bucket=bucket)
delete_api.delete(
start=start_time, stop=stop_time, predicate=predicate, bucket=bucket
)
# 2025/08/18 从文件导入scada数据xkl
#2025/08/18 从文件导入scada数据xkl
def import_data_from_file(file_path: str, bucket: str = "SCADA_data") -> None:
"""
从指定的CSV文件导入数据到InfluxDB的指定bucket中。
@@ -4490,9 +4535,13 @@ def import_data_from_file(file_path: str, bucket: str = "SCADA_data") -> None:
"""
client = get_new_client()
if not client.ping():
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
print(
"{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
)
#清空指定bucket的数据
# 清空指定bucket的数据
# delete_api = DeleteApi(client)
# start = "1970-01-01T00:00:00Z"
# stop = "2100-01-01T00:00:00Z"
@@ -4502,35 +4551,40 @@ def import_data_from_file(file_path: str, bucket: str = "SCADA_data") -> None:
write_api = client.write_api(write_options=SYNCHRONOUS)
points_to_write = []
for _, row in df.iterrows():
scada_id = row['ScadaId']
value = row['Value']
time_str = row['Time']
scada_id = row["ScadaId"]
value = row["Value"]
time_str = row["Time"]
date_str = str(time_str)[:10] # 取前10位作为日期
try:
raw_value = float(value)
except (ValueError, TypeError):
raw_value = 0.0
point = Point("SCADA") \
.tag("date", date_str) \
.tag("description", None) \
.tag("device_ID", scada_id) \
.field("monitored_value", raw_value) \
.field("datacleaning_value", 0.0) \
.field("simulation_value", 0.0) \
.time(time_str, write_precision='s')
point = (
Point("SCADA")
.tag("date", date_str)
.tag("description", None)
.tag("device_ID", scada_id)
.field("monitored_value", raw_value)
.field("datacleaning_value", 0.0)
.field("simulation_value", 0.0)
.time(time_str, write_precision="s")
)
points_to_write.append(point)
# 批量写入数据
batch_size = 500
for i in range(0, len(points_to_write), batch_size):
batch = points_to_write[i:i+batch_size]
batch = points_to_write[i : i + batch_size]
write_api.write(bucket=bucket, record=batch)
print(f"Data imported from {file_path} to bucket {bucket} successfully.")
print(f"Total points written: {len(points_to_write)}")
write_api.close()
client.close()
#2025/08/28 从多列格式文件导入SCADA数据xkl
def import_multicolumn_data_from_file(file_path: str, raw: bool = True, bucket: str = "SCADA_data") -> None:
# 2025/08/28 从多列格式文件导入SCADA数据xkl
def import_multicolumn_data_from_file(
file_path: str, raw: bool = True, bucket: str = "SCADA_data"
) -> None:
"""
从指定的多列格式CSV文件导入数据到InfluxDB的指定bucket中。
:param file_path: CSV文件的路径
@@ -4541,13 +4595,18 @@ def import_multicolumn_data_from_file(file_path: str, raw: bool = True, bucket:
write_api = client.write_api(write_options=SYNCHRONOUS)
points_to_write = []
if not client.ping():
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y/%m/%d %H:%M')))
print(
"{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime("%Y/%m/%d %H:%M")
)
)
def convert_to_iso(timestr):
# 假设原格式为 '2025/8/3 0:00'
dt = datetime.strptime(timestr, '%Y-%m-%d %H:%M:%S')
dt = datetime.strptime(timestr, "%Y-%m-%d %H:%M:%S")
return dt.isoformat()
with open(file_path, encoding='utf-8') as f:
with open(file_path, encoding="utf-8") as f:
reader = csv.reader(f)
header = next(reader)
device_ids = header[1:] # 第一列是time后面是device_ID
@@ -4562,20 +4621,22 @@ def import_multicolumn_data_from_file(file_path: str, raw: bool = True, bucket:
raw_value = 0.0
scada_id = device_ids[idx]
# 如果是原始数据直接使用Value列
point = Point("SCADA") \
.tag("date", iso_time.split('T')[0]) \
.tag("description", None) \
.tag("device_ID", scada_id) \
.field("monitored_value", raw_value) \
.field("datacleaning_value", 0.0) \
.field("simulation_value", 0.0) \
point = (
Point("SCADA")
.tag("date", iso_time.split("T")[0])
.tag("description", None)
.tag("device_ID", scada_id)
.field("monitored_value", raw_value)
.field("datacleaning_value", 0.0)
.field("simulation_value", 0.0)
.time(iso_time, WritePrecision.S)
)
points_to_write.append(point)
else:
for row in reader:
time_str = row[0]
iso_time = convert_to_iso(time_str)
# 如果不是原始数据直接使用datacleaning_value列
# 如果不是原始数据直接使用datacleaning_value列
for idx, value in enumerate(row[1:]):
scada_id = device_ids[idx]
try:
@@ -4583,26 +4644,102 @@ def import_multicolumn_data_from_file(file_path: str, raw: bool = True, bucket:
except (ValueError, TypeError):
datacleaning_value = 0.0
# 如果是清洗数据直接使用datacleaning_value列
point = Point("SCADA") \
.tag("date", iso_time.split('T')[0]) \
.tag("description", "None") \
.tag("device_ID", scada_id) \
.field("monitored_value", 0.0) \
.field("datacleaning_value",datacleaning_value) \
.field("simulation_value", 0.0) \
point = (
Point("SCADA")
.tag("date", iso_time.split("T")[0])
.tag("description", "None")
.tag("device_ID", scada_id)
.field("monitored_value", 0.0)
.field("datacleaning_value", datacleaning_value)
.field("simulation_value", 0.0)
.time(iso_time, WritePrecision.S)
)
points_to_write.append(point)
# 批量写入数据
batch_size = 1000
for i in range(0, len(points_to_write), batch_size):
batch = points_to_write[i:i+batch_size]
batch = points_to_write[i : i + batch_size]
write_api.write(bucket=bucket, record=batch)
print(f"Data imported from {file_path} to bucket {bucket} successfully.")
print(f"Total points written: {len(points_to_write)}")
write_api.close()
client.close()
# 从多列格式字典导入SCADA数据
def import_multicolumn_data_from_dict(
data_dict: dict[str, list], raw: bool = True, bucket: str = "SCADA_data"
) -> None:
"""
从指定的多列格式字典导入数据到InfluxDB的指定bucket中。
:param data_dict: 字典格式数据,键为列名(第一个为'time',其他为设备ID),值为对应列的值列表
:param raw: 是否为原始数据,True则写入monitored_value,False则写入datacleaning_value
:param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data"
:return:
"""
client = get_new_client()
write_api = client.write_api(write_options=SYNCHRONOUS)
points_to_write = []
if not client.ping():
print(
"{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime("%Y/%m/%d %H:%M:%S")
)
)
# 获取时间列表和设备ID列表
time_list = data_dict.get("time", [])
device_ids = [key for key in data_dict.keys() if key != "time"]
# 遍历每个时间点
for i, time_str in enumerate(time_list):
# 确保 time_str 是字符串格式
if not isinstance(time_str, str):
time_str = str(time_str)
for device_id in device_ids:
value = data_dict[device_id][i]
try:
float_value = float(value)
except (ValueError, TypeError):
float_value = 0.0
if raw:
# 如果是原始数据,写入monitored_value
point = (
Point("SCADA")
.tag("date", time_str.split("T")[0])
.tag("description", None)
.tag("device_ID", device_id)
.field("monitored_value", float_value)
.field("datacleaning_value", 0.0)
.field("simulation_value", 0.0)
.time(time_str, WritePrecision.S)
)
else:
# 如果是清洗数据,写入datacleaning_value
point = (
Point("SCADA")
.tag("date", time_str.split("T")[0])
.tag("description", "None")
.tag("device_ID", device_id)
.field("monitored_value", 0.0)
.field("datacleaning_value", float_value)
.field("simulation_value", 0.0)
.time(time_str, WritePrecision.S)
)
points_to_write.append(point)
# 批量写入数据
batch_size = 1000
for i in range(0, len(points_to_write), batch_size):
batch = points_to_write[i : i + batch_size]
write_api.write(bucket=bucket, record=batch)
print(f"Data imported from dict to bucket {bucket} successfully.")
print(f"Total points written: {len(points_to_write)}")
write_api.close()
client.close()
# 示例调用
if __name__ == "__main__":
@@ -4739,18 +4876,18 @@ if __name__ == "__main__":
# end_time='2024-03-26T23:59:00+08:00')
# print(result)
#示例import_data_from_file
#import_data_from_file(file_path='data/Flow_Timedata.csv', bucket='SCADA_data')
# 示例import_data_from_file
# import_data_from_file(file_path='data/Flow_Timedata.csv', bucket='SCADA_data')
# # 示例query_all_records_by_type_date
#result = query_all__records_by_type__date(type='node', query_date='2025-08-04')
# result = query_all__records_by_type__date(type='node', query_date='2025-08-04')
#示例query_all_records_by_date_hour
#result = query_all_records_by_date_hour(query_date='2025-08-04', query_hour=1)
# 示例query_all_records_by_date_hour
# result = query_all_records_by_date_hour(query_date='2025-08-04', query_hour=1)
# 示例import_multicolumn_data_from_file
# import_multicolumn_data_from_file(file_path='data/selected_Flow_Timedata2025_new_format_cleaned.csv', raw=False, bucket='SCADA_data')
#示例import_multicolumn_data_from_file
#import_multicolumn_data_from_file(file_path='data/selected_Flow_Timedata2025_new_format_cleaned.csv', raw=False, bucket='SCADA_data')
# client = InfluxDBClient(url="http://localhost:8086", token=token, org=org_name)
# delete_api = client.delete_api()
@@ -4760,4 +4897,3 @@ if __name__ == "__main__":
# delete_api.delete(start, stop, predicate, bucket="SCADA_data", org=org_name)
# client.close()

101
main.py
View File

@@ -4080,6 +4080,107 @@ async def fastapi_pressure_sensor_placement(
raise HTTPException(status_code=500, detail=f"执行失败: {str(e)}")
# 新增 SCADA 设备清洗接口
@app.post("/scadadevicedatacleaning/")
async def fastapi_scada_device_data_cleaning(
network: str = Query(...),
ids_list: List[str] = Query(...),
start_time: str = Query(...),
end_time: str = Query(...),
user_name: str = Query(...),
) -> str:
import pandas as pd # 假设可以使用 pandas 处理表格数据
item = {
"network": network,
"ids": ids_list,
"start_time": start_time,
"end_time": end_time,
"user_name": user_name,
}
query_ids_list = item["ids"][0].split(",")
# 先调用 query_SCADA_data_by_device_ID_and_timerange 获取原始数据
scada_data = influxdb_api.query_SCADA_data_by_device_ID_and_timerange(
query_ids_list=query_ids_list,
start_time=item["start_time"],
end_time=item["end_time"],
)
# 获取对应管网的所有 SCADA 设备信息
scada_device_info = influxdb_api.query_pg_scada_info(item["network"])
# 将列表转换为字典,以 device_id 为键
scada_device_info_dict = {info["id"]: info for info in scada_device_info}
# 按设备类型分组设备
type_groups = {}
for device_id in query_ids_list:
device_info = scada_device_info_dict.get(device_id, {})
device_type = device_info.get("type", "unknown")
if device_type not in type_groups:
type_groups[device_type] = []
type_groups[device_type].append(device_id)
# 批量处理每种类型的设备
for device_type, device_ids in type_groups.items():
if device_type not in ["pressure", "pipe_flow"]:
continue # 跳过未知类型
# 过滤该类型的设备数据
type_scada_data = {
device_id: scada_data[device_id]
for device_id in device_ids
if device_id in scada_data
}
if not type_scada_data:
continue
# 假设所有设备的时间点相同,提取 time 列表
time_list = [record["time"] for record in next(iter(type_scada_data.values()))]
# 创建 DataFrame第一列是 time然后是每个设备的 value 列
df = pd.DataFrame({"time": time_list})
for device_id in device_ids:
if device_id in type_scada_data:
values = [record["value"] for record in type_scada_data[device_id]]
df[device_id] = values
# 移除 time 列,准备输入给清洗方法(清洗方法期望 value 表格)
value_df = df.drop(columns=["time"])
# 调用清洗方法
if device_type == "pressure":
cleaned_value_df = api_ex.Pdataclean.clean_pressure_data_dict_km(value_df)
elif device_type == "pipe_flow":
cleaned_value_df = api_ex.Fdataclean.clean_flow_data_dict(value_df)
# 添加 time 列到首列
cleaned_value_df = pd.DataFrame(cleaned_value_df)
# 只选择以 '_cleaned' 结尾的清洗数据列
cleaned_columns = [
col for col in cleaned_value_df.columns if col.endswith("_cleaned")
]
cleaned_value_df = cleaned_value_df[cleaned_columns]
# 重命名列,移除 '_cleaned' 后缀
cleaned_value_df = cleaned_value_df.rename(
columns={
col: col.replace("_cleaned", "") for col in cleaned_value_df.columns
}
)
cleaned_df = pd.concat([df["time"], cleaned_value_df], axis=1)
# 调试输出,确认列名
print(f"清洗后的列名: {cleaned_df.columns.tolist()}")
# 将清洗后的数据写回数据库
influxdb_api.import_multicolumn_data_from_dict(
data_dict=cleaned_df.to_dict("list"), # 转换为 {column_name: [values]} 格式
raw=False,
)
return "success"
class Item(BaseModel):
str_info: str
dict_info: Optional[dict] = None