From 842abaceb2988bbb8bd94587c830ee6a2f2f1f8e Mon Sep 17 00:00:00 2001 From: DingZQ Date: Sat, 15 Feb 2025 15:43:09 +0800 Subject: [PATCH] Add API queryscadadatabydeviceidandtime --- influxdb_api.py | 53 +++++++++++++++++++++++++++++++++++++++++++++++++ main.py | 6 ++++++ 2 files changed, 59 insertions(+) diff --git a/influxdb_api.py b/influxdb_api.py index cce8e78..6ea4262 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -1017,6 +1017,59 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time return SCADA_result_dict +# DingZQ, 2025-02-15 +def query_SCADA_data_by_device_ID_and_time_range(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> Dict[str, float]: + """ + 根据SCADA设备的ID和时间查询值 + :param query_ids_list: SCADA设备ID的列表 + :param start_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param end_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 + :param client: 已初始化的 InfluxDBClient 实例。 + :return: + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 将北京时间转换为 UTC 时间 + utc_start_time = time_api.to_utc_time(start_time) + utc_end_time = time_api.to_utc_time(end_time) + + # 构建查询字典 + SCADA_result_dict = {} + + for device_id in query_ids_list: + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_end_time.isoformat()}) + |> filter(fn: (r) => r["device_ID"] == "{device_id}") + |> filter(fn: (r) => r["_field"] == "monitored_value") + ''' + + # 执行查询 + try: + result = query_api.query(flux_query) + + # 从查询结果中提取 monitored_value + if result: + # 假设返回的结果为一行数据 + for table in result: + for record in table.records: + # 获取字段 "_value" 即为 monitored_value + monitored_value = record.get_value() + SCADA_result_dict[device_id] = monitored_value + else: + # 如果没有结果,默认设置为 None 或其他值 + SCADA_result_dict[device_id] = None + except Exception as e: + print(f"Error querying InfluxDB for device ID {device_id}: {e}") + SCADA_result_dict[device_id] = None + + return SCADA_result_dict # 2025/02/01 def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], diff --git a/main.py b/main.py index 7138483..2dddc88 100644 --- a/main.py +++ b/main.py @@ -2129,6 +2129,12 @@ async def query_scada_data_by_device_id_and_time(ids: str, querytime: str): logger.info(querytime) return influxdb_api.query_SCADA_data_by_device_ID_and_time(query_ids_list=query_ids, query_time=querytime, client=influx_client) +@app.get("/queryscadadatabydeviceidandtime/") +async def query_scada_data_by_device_id_and_time_range(ids: str, starttime: str, endtime: str): + query_ids = ids.split(',') + return influxdb_api.query_SCADA_data_by_device_ID_and_time_range(query_ids_list=query_ids, start_time=starttime, end_time=endtime, client=influx_client) + + @app.get("/queryinfluxdbbuckets/") async def fastapi_query_influxdb_buckets():