From ac653fa2c19a81337a52088961d27f079438652d Mon Sep 17 00:00:00 2001 From: JIANG Date: Wed, 5 Nov 2025 15:23:46 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=20SCADA=20=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E6=B8=85=E6=B4=97=20API=EF=BC=9B=E6=96=B0=E5=A2=9E=E8=B0=83?= =?UTF-8?q?=E8=AF=95=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/launch.json | 23 +++++ influxdb_api.py | 230 +++++++++++++++++++++++++++++++++++--------- main.py | 101 +++++++++++++++++++ 3 files changed, 307 insertions(+), 47 deletions(-) create mode 100644 .vscode/launch.json diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..e586700 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,23 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "debugpy", + "request": "launch", + "name": "Debug Uvicorn", + "module": "uvicorn", + "args": [ + "main:app", + "--host", + "0.0.0.0", + "--port", + "8000", + "--workers", + "1" + ] + } + ] +} \ No newline at end of file diff --git a/influxdb_api.py b/influxdb_api.py index bcd49df..6d1c386 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -7,6 +7,7 @@ from influxdb_client import ( QueryApi, WriteOptions, DeleteApi, + WritePrecision, ) from typing import List, Dict from datetime import datetime, timedelta, timezone @@ -191,6 +192,46 @@ def query_pg_scada_info_non_realtime(name: str) -> None: print(f"查询时发生错误:{e}") +def query_pg_scada_info(name: str) -> list[dict]: + """ + 查询pg数据库中,scada_info 的所有记录 + :param name: 数据库名称 + :return: 包含所有记录的列表,每条记录为一个字典 + """ + # 连接数据库 + conn_string = f"dbname={name} host=127.0.0.1" + records_list = [] + + try: + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + # 查询 scada_info 表的所有记录 + cur.execute( + """ + SELECT id, type, transmission_mode, transmission_frequency, reliability + FROM public.scada_info; + """ + ) + records = cur.fetchall() + + # 将查询结果转换为字典列表 + for record in records: + record_dict = { + "id": record[0], + "type": record[1], + "transmission_mode": record[2], + "transmission_frequency": record[3], + "reliability": record[4], + } + records_list.append(record_dict) + + except Exception as e: + print(f"查询时发生错误:{e}") + return [] + + return records_list + + # 2025/03/23 def get_new_client() -> InfluxDBClient: """每次调用返回一个新的 InfluxDBClient 实例。""" @@ -4478,9 +4519,13 @@ def delete_data(delete_date: str, bucket: str) -> None: predicate = f'date="{delete_date}"' delete_api: DeleteApi = client.delete_api() - delete_api.delete(start=start_time, stop=stop_time, predicate=predicate, bucket=bucket) + delete_api.delete( + start=start_time, stop=stop_time, predicate=predicate, bucket=bucket + ) + + # 2025/08/18 从文件导入scada数据,xkl + - #2025/08/18 从文件导入scada数据,xkl def import_data_from_file(file_path: str, bucket: str = "SCADA_data") -> None: """ 从指定的CSV文件导入数据到InfluxDB的指定bucket中。 @@ -4490,9 +4535,13 @@ def import_data_from_file(file_path: str, bucket: str = "SCADA_data") -> None: """ client = get_new_client() if not client.ping(): - print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + print( + "{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ) + ) - #清空指定bucket的数据 + # 清空指定bucket的数据 # delete_api = DeleteApi(client) # start = "1970-01-01T00:00:00Z" # stop = "2100-01-01T00:00:00Z" @@ -4502,35 +4551,40 @@ def import_data_from_file(file_path: str, bucket: str = "SCADA_data") -> None: write_api = client.write_api(write_options=SYNCHRONOUS) points_to_write = [] for _, row in df.iterrows(): - scada_id = row['ScadaId'] - value = row['Value'] - time_str = row['Time'] + scada_id = row["ScadaId"] + value = row["Value"] + time_str = row["Time"] date_str = str(time_str)[:10] # 取前10位作为日期 try: raw_value = float(value) except (ValueError, TypeError): raw_value = 0.0 - point = Point("SCADA") \ - .tag("date", date_str) \ - .tag("description", None) \ - .tag("device_ID", scada_id) \ - .field("monitored_value", raw_value) \ - .field("datacleaning_value", 0.0) \ - .field("simulation_value", 0.0) \ - .time(time_str, write_precision='s') + point = ( + Point("SCADA") + .tag("date", date_str) + .tag("description", None) + .tag("device_ID", scada_id) + .field("monitored_value", raw_value) + .field("datacleaning_value", 0.0) + .field("simulation_value", 0.0) + .time(time_str, write_precision="s") + ) points_to_write.append(point) # 批量写入数据 batch_size = 500 for i in range(0, len(points_to_write), batch_size): - batch = points_to_write[i:i+batch_size] + batch = points_to_write[i : i + batch_size] write_api.write(bucket=bucket, record=batch) print(f"Data imported from {file_path} to bucket {bucket} successfully.") print(f"Total points written: {len(points_to_write)}") write_api.close() client.close() -#2025/08/28 从多列格式文件导入SCADA数据,xkl -def import_multicolumn_data_from_file(file_path: str, raw: bool = True, bucket: str = "SCADA_data") -> None: + +# 2025/08/28 从多列格式文件导入SCADA数据,xkl +def import_multicolumn_data_from_file( + file_path: str, raw: bool = True, bucket: str = "SCADA_data" +) -> None: """ 从指定的多列格式CSV文件导入数据到InfluxDB的指定bucket中。 :param file_path: CSV文件的路径 @@ -4541,13 +4595,18 @@ def import_multicolumn_data_from_file(file_path: str, raw: bool = True, bucket: write_api = client.write_api(write_options=SYNCHRONOUS) points_to_write = [] if not client.ping(): - print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y/%m/%d %H:%M'))) + print( + "{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime("%Y/%m/%d %H:%M") + ) + ) + def convert_to_iso(timestr): # 假设原格式为 '2025/8/3 0:00' - dt = datetime.strptime(timestr, '%Y-%m-%d %H:%M:%S') + dt = datetime.strptime(timestr, "%Y-%m-%d %H:%M:%S") return dt.isoformat() - with open(file_path, encoding='utf-8') as f: + with open(file_path, encoding="utf-8") as f: reader = csv.reader(f) header = next(reader) device_ids = header[1:] # 第一列是time,后面是device_ID @@ -4562,20 +4621,22 @@ def import_multicolumn_data_from_file(file_path: str, raw: bool = True, bucket: raw_value = 0.0 scada_id = device_ids[idx] # 如果是原始数据,直接使用Value列 - point = Point("SCADA") \ - .tag("date", iso_time.split('T')[0]) \ - .tag("description", None) \ - .tag("device_ID", scada_id) \ - .field("monitored_value", raw_value) \ - .field("datacleaning_value", 0.0) \ - .field("simulation_value", 0.0) \ + point = ( + Point("SCADA") + .tag("date", iso_time.split("T")[0]) + .tag("description", None) + .tag("device_ID", scada_id) + .field("monitored_value", raw_value) + .field("datacleaning_value", 0.0) + .field("simulation_value", 0.0) .time(iso_time, WritePrecision.S) + ) points_to_write.append(point) else: for row in reader: time_str = row[0] iso_time = convert_to_iso(time_str) - # 如果不是原始数据,直接使用datacleaning_value列 + # 如果不是原始数据,直接使用datacleaning_value列 for idx, value in enumerate(row[1:]): scada_id = device_ids[idx] try: @@ -4583,26 +4644,102 @@ def import_multicolumn_data_from_file(file_path: str, raw: bool = True, bucket: except (ValueError, TypeError): datacleaning_value = 0.0 # 如果是清洗数据,直接使用datacleaning_value列 - point = Point("SCADA") \ - .tag("date", iso_time.split('T')[0]) \ - .tag("description", "None") \ - .tag("device_ID", scada_id) \ - .field("monitored_value", 0.0) \ - .field("datacleaning_value",datacleaning_value) \ - .field("simulation_value", 0.0) \ + point = ( + Point("SCADA") + .tag("date", iso_time.split("T")[0]) + .tag("description", "None") + .tag("device_ID", scada_id) + .field("monitored_value", 0.0) + .field("datacleaning_value", datacleaning_value) + .field("simulation_value", 0.0) .time(iso_time, WritePrecision.S) + ) points_to_write.append(point) # 批量写入数据 batch_size = 1000 for i in range(0, len(points_to_write), batch_size): - batch = points_to_write[i:i+batch_size] + batch = points_to_write[i : i + batch_size] write_api.write(bucket=bucket, record=batch) print(f"Data imported from {file_path} to bucket {bucket} successfully.") print(f"Total points written: {len(points_to_write)}") write_api.close() client.close() - + +# 从多列格式字典导入SCADA数据 +def import_multicolumn_data_from_dict( + data_dict: dict[str, list], raw: bool = True, bucket: str = "SCADA_data" +) -> None: + """ + 从指定的多列格式字典导入数据到InfluxDB的指定bucket中。 + :param data_dict: 字典格式数据,键为列名(第一个为'time',其他为设备ID),值为对应列的值列表 + :param raw: 是否为原始数据,True则写入monitored_value,False则写入datacleaning_value + :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" + :return: + """ + client = get_new_client() + write_api = client.write_api(write_options=SYNCHRONOUS) + points_to_write = [] + if not client.ping(): + print( + "{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ) + ) + + # 获取时间列表和设备ID列表 + time_list = data_dict.get("time", []) + device_ids = [key for key in data_dict.keys() if key != "time"] + + # 遍历每个时间点 + for i, time_str in enumerate(time_list): + # 确保 time_str 是字符串格式 + if not isinstance(time_str, str): + time_str = str(time_str) + + for device_id in device_ids: + value = data_dict[device_id][i] + try: + float_value = float(value) + except (ValueError, TypeError): + float_value = 0.0 + + if raw: + # 如果是原始数据,写入monitored_value + point = ( + Point("SCADA") + .tag("date", time_str.split("T")[0]) + .tag("description", None) + .tag("device_ID", device_id) + .field("monitored_value", float_value) + .field("datacleaning_value", 0.0) + .field("simulation_value", 0.0) + .time(time_str, WritePrecision.S) + ) + else: + # 如果是清洗数据,写入datacleaning_value + point = ( + Point("SCADA") + .tag("date", time_str.split("T")[0]) + .tag("description", "None") + .tag("device_ID", device_id) + .field("monitored_value", 0.0) + .field("datacleaning_value", float_value) + .field("simulation_value", 0.0) + .time(time_str, WritePrecision.S) + ) + points_to_write.append(point) + + # 批量写入数据 + batch_size = 1000 + for i in range(0, len(points_to_write), batch_size): + batch = points_to_write[i : i + batch_size] + write_api.write(bucket=bucket, record=batch) + print(f"Data imported from dict to bucket {bucket} successfully.") + print(f"Total points written: {len(points_to_write)}") + write_api.close() + client.close() + # 示例调用 if __name__ == "__main__": @@ -4739,18 +4876,18 @@ if __name__ == "__main__": # end_time='2024-03-26T23:59:00+08:00') # print(result) - #示例:import_data_from_file - #import_data_from_file(file_path='data/Flow_Timedata.csv', bucket='SCADA_data') + # 示例:import_data_from_file + # import_data_from_file(file_path='data/Flow_Timedata.csv', bucket='SCADA_data') # # 示例:query_all_records_by_type_date - #result = query_all__records_by_type__date(type='node', query_date='2025-08-04') + # result = query_all__records_by_type__date(type='node', query_date='2025-08-04') - #示例:query_all_records_by_date_hour - #result = query_all_records_by_date_hour(query_date='2025-08-04', query_hour=1) + # 示例:query_all_records_by_date_hour + # result = query_all_records_by_date_hour(query_date='2025-08-04', query_hour=1) + + # 示例:import_multicolumn_data_from_file + # import_multicolumn_data_from_file(file_path='data/selected_Flow_Timedata2025_new_format_cleaned.csv', raw=False, bucket='SCADA_data') - #示例:import_multicolumn_data_from_file - #import_multicolumn_data_from_file(file_path='data/selected_Flow_Timedata2025_new_format_cleaned.csv', raw=False, bucket='SCADA_data') - # client = InfluxDBClient(url="http://localhost:8086", token=token, org=org_name) # delete_api = client.delete_api() @@ -4760,4 +4897,3 @@ if __name__ == "__main__": # delete_api.delete(start, stop, predicate, bucket="SCADA_data", org=org_name) # client.close() - diff --git a/main.py b/main.py index 11b5f5c..2e4d098 100644 --- a/main.py +++ b/main.py @@ -4080,6 +4080,107 @@ async def fastapi_pressure_sensor_placement( raise HTTPException(status_code=500, detail=f"执行失败: {str(e)}") +# 新增 SCADA 设备清洗接口 +@app.post("/scadadevicedatacleaning/") +async def fastapi_scada_device_data_cleaning( + network: str = Query(...), + ids_list: List[str] = Query(...), + start_time: str = Query(...), + end_time: str = Query(...), + user_name: str = Query(...), +) -> str: + import pandas as pd # 假设可以使用 pandas 处理表格数据 + + item = { + "network": network, + "ids": ids_list, + "start_time": start_time, + "end_time": end_time, + "user_name": user_name, + } + query_ids_list = item["ids"][0].split(",") + # 先调用 query_SCADA_data_by_device_ID_and_timerange 获取原始数据 + scada_data = influxdb_api.query_SCADA_data_by_device_ID_and_timerange( + query_ids_list=query_ids_list, + start_time=item["start_time"], + end_time=item["end_time"], + ) + + # 获取对应管网的所有 SCADA 设备信息 + scada_device_info = influxdb_api.query_pg_scada_info(item["network"]) + # 将列表转换为字典,以 device_id 为键 + scada_device_info_dict = {info["id"]: info for info in scada_device_info} + + # 按设备类型分组设备 + type_groups = {} + for device_id in query_ids_list: + device_info = scada_device_info_dict.get(device_id, {}) + device_type = device_info.get("type", "unknown") + if device_type not in type_groups: + type_groups[device_type] = [] + type_groups[device_type].append(device_id) + + # 批量处理每种类型的设备 + for device_type, device_ids in type_groups.items(): + if device_type not in ["pressure", "pipe_flow"]: + continue # 跳过未知类型 + + # 过滤该类型的设备数据 + type_scada_data = { + device_id: scada_data[device_id] + for device_id in device_ids + if device_id in scada_data + } + + if not type_scada_data: + continue + + # 假设所有设备的时间点相同,提取 time 列表 + time_list = [record["time"] for record in next(iter(type_scada_data.values()))] + + # 创建 DataFrame,第一列是 time,然后是每个设备的 value 列 + df = pd.DataFrame({"time": time_list}) + for device_id in device_ids: + if device_id in type_scada_data: + values = [record["value"] for record in type_scada_data[device_id]] + df[device_id] = values + + # 移除 time 列,准备输入给清洗方法(清洗方法期望 value 表格) + value_df = df.drop(columns=["time"]) + + # 调用清洗方法 + if device_type == "pressure": + cleaned_value_df = api_ex.Pdataclean.clean_pressure_data_dict_km(value_df) + elif device_type == "pipe_flow": + cleaned_value_df = api_ex.Fdataclean.clean_flow_data_dict(value_df) + + # 添加 time 列到首列 + cleaned_value_df = pd.DataFrame(cleaned_value_df) + # 只选择以 '_cleaned' 结尾的清洗数据列 + cleaned_columns = [ + col for col in cleaned_value_df.columns if col.endswith("_cleaned") + ] + cleaned_value_df = cleaned_value_df[cleaned_columns] + # 重命名列,移除 '_cleaned' 后缀 + cleaned_value_df = cleaned_value_df.rename( + columns={ + col: col.replace("_cleaned", "") for col in cleaned_value_df.columns + } + ) + cleaned_df = pd.concat([df["time"], cleaned_value_df], axis=1) + + # 调试输出,确认列名 + print(f"清洗后的列名: {cleaned_df.columns.tolist()}") + + # 将清洗后的数据写回数据库 + influxdb_api.import_multicolumn_data_from_dict( + data_dict=cleaned_df.to_dict("list"), # 转换为 {column_name: [values]} 格式 + raw=False, + ) + + return "success" + + class Item(BaseModel): str_info: str dict_info: Optional[dict] = None