Update influxdb_api.py
This commit is contained in:
141
influxdb_api.py
141
influxdb_api.py
@@ -1,5 +1,4 @@
|
|||||||
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi, WriteOptions
|
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi, WriteOptions
|
||||||
from influxdb_client.client.exceptions import InfluxDBError
|
|
||||||
from typing import List, Dict
|
from typing import List, Dict
|
||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
|
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
|
||||||
@@ -103,7 +102,8 @@ def query_pg_scada_info_non_realtime(name: str) -> None:
|
|||||||
open_project(name)
|
open_project(name)
|
||||||
dic_time = get_time(name)
|
dic_time = get_time(name)
|
||||||
globals.hydraulic_timestep = dic_time['HYDRAULIC TIMESTEP']
|
globals.hydraulic_timestep = dic_time['HYDRAULIC TIMESTEP']
|
||||||
close_project(name)
|
# DingZQ, 2025-03-21
|
||||||
|
#close_project(name)
|
||||||
# 连接数据库
|
# 连接数据库
|
||||||
conn_string = f"dbname={name} host=127.0.0.1"
|
conn_string = f"dbname={name} host=127.0.0.1"
|
||||||
try:
|
try:
|
||||||
@@ -231,7 +231,8 @@ def create_and_initialize_buckets(org_name: str) -> None:
|
|||||||
if not client.ping():
|
if not client.ping():
|
||||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
# 先删除原有的,然后再进行初始化
|
# 先删除原有的,然后再进行初始化
|
||||||
delete_buckets(org_name)
|
# delete_buckets(org_name)
|
||||||
|
|
||||||
bucket_api = BucketsApi(client)
|
bucket_api = BucketsApi(client)
|
||||||
# 本地变量,用于记录成功写入的数据点数量
|
# 本地变量,用于记录成功写入的数据点数量
|
||||||
points_written = 0
|
points_written = 0
|
||||||
@@ -284,8 +285,7 @@ def create_and_initialize_buckets(org_name: str) -> None:
|
|||||||
.tag("device_ID", None) \
|
.tag("device_ID", None) \
|
||||||
.field("monitored_value", 0.0) \
|
.field("monitored_value", 0.0) \
|
||||||
.field("datacleaning_value", 0.0) \
|
.field("datacleaning_value", 0.0) \
|
||||||
.field("datafilling_value", 0.0) \
|
.field("simulation_value", 0.0) \
|
||||||
.field("cleaned_value", 0.0) \
|
|
||||||
.time("2024-11-21T00:00:00Z", write_precision='s')
|
.time("2024-11-21T00:00:00Z", write_precision='s')
|
||||||
points_to_write.append(point)
|
points_to_write.append(point)
|
||||||
# write_api.write(bucket="SCADA_data", org=org_name, record=point)
|
# write_api.write(bucket="SCADA_data", org=org_name, record=point)
|
||||||
@@ -1500,6 +1500,7 @@ def query_all_SCADA_records_by_date(query_date: str, bucket: str="SCADA_data") -
|
|||||||
return SCADA_results
|
return SCADA_results
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time: str, bucket: str="SCADA_data") -> Dict[str, float]:
|
def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time: str, bucket: str="SCADA_data") -> Dict[str, float]:
|
||||||
"""
|
"""
|
||||||
根据SCADA设备的ID和时间查询值
|
根据SCADA设备的ID和时间查询值
|
||||||
@@ -1545,6 +1546,7 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time
|
|||||||
print(f"Error querying InfluxDB for device ID {device_id}: {e}")
|
print(f"Error querying InfluxDB for device ID {device_id}: {e}")
|
||||||
SCADA_result_dict[device_id] = None
|
SCADA_result_dict[device_id] = None
|
||||||
client.close()
|
client.close()
|
||||||
|
|
||||||
return SCADA_result_dict
|
return SCADA_result_dict
|
||||||
|
|
||||||
|
|
||||||
@@ -1593,7 +1595,9 @@ def query_scheme_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], que
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error querying InfluxDB for device ID {device_id}: {e}")
|
print(f"Error querying InfluxDB for device ID {device_id}: {e}")
|
||||||
SCADA_result_dict[device_id] = None
|
SCADA_result_dict[device_id] = None
|
||||||
|
|
||||||
client.close()
|
client.close()
|
||||||
|
|
||||||
return SCADA_result_dict
|
return SCADA_result_dict
|
||||||
|
|
||||||
# 2025/03/14
|
# 2025/03/14
|
||||||
@@ -1672,10 +1676,11 @@ def query_SCADA_data_by_device_ID_and_date(query_ids_list: List[str], query_date
|
|||||||
|
|
||||||
return query_SCADA_data_by_device_ID_and_time_range(query_ids_list, str(start_time), str(end_time), bucket)
|
return query_SCADA_data_by_device_ID_and_time_range(query_ids_list, str(start_time), str(end_time), bucket)
|
||||||
|
|
||||||
|
|
||||||
# 2025/04/17
|
# 2025/04/17
|
||||||
def query_cleaning_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"):
|
def query_cleaned_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"):
|
||||||
"""
|
"""
|
||||||
查询指定时间范围内,多个SCADA设备的修复的单个的数据
|
查询指定时间范围内,多个SCADA设备的清洗后的数据
|
||||||
:param query_ids_list: SCADA设备ID的列表
|
:param query_ids_list: SCADA设备ID的列表
|
||||||
:param start_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。
|
:param start_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。
|
||||||
:param end_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。
|
:param end_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。
|
||||||
@@ -1722,106 +1727,6 @@ def query_cleaning_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[st
|
|||||||
return SCADA_dict
|
return SCADA_dict
|
||||||
|
|
||||||
|
|
||||||
# 2025/04/22
|
|
||||||
def query_filling_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"):
|
|
||||||
"""
|
|
||||||
查询指定时间范围内,多个SCADA设备的填补的单个的数据
|
|
||||||
:param query_ids_list: SCADA设备ID的列表
|
|
||||||
:param start_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。
|
|
||||||
:param end_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。
|
|
||||||
:param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
client = get_new_client()
|
|
||||||
if not client.ping():
|
|
||||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
|
||||||
|
|
||||||
query_api = client.query_api()
|
|
||||||
print('start_time', start_time)
|
|
||||||
print('end_time', end_time)
|
|
||||||
# 将北京时间转换为 UTC 时间
|
|
||||||
beijing_start_time = datetime.fromisoformat(start_time)
|
|
||||||
print('beijing_start_time', beijing_start_time)
|
|
||||||
utc_start_time = time_api.to_utc_time(beijing_start_time)
|
|
||||||
print('utc_start_time', utc_start_time)
|
|
||||||
beijing_end_time = datetime.fromisoformat(end_time)
|
|
||||||
print('beijing_end_time', beijing_end_time)
|
|
||||||
utc_stop_time = time_api.to_utc_time(beijing_end_time)
|
|
||||||
print('utc_stop_time', utc_stop_time)
|
|
||||||
SCADA_dict = {}
|
|
||||||
for device_id in query_ids_list:
|
|
||||||
flux_query = f'''
|
|
||||||
from(bucket: "{bucket}")
|
|
||||||
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|
|
||||||
|> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "datafilling_value")
|
|
||||||
|> sort(columns: ["_time"])
|
|
||||||
'''
|
|
||||||
# 执行查询,返回一个 FluxTable 列表
|
|
||||||
tables = query_api.query(flux_query)
|
|
||||||
print(tables)
|
|
||||||
records_list = []
|
|
||||||
for table in tables:
|
|
||||||
for record in table.records:
|
|
||||||
# 获取记录的时间和监测值
|
|
||||||
records_list.append({
|
|
||||||
"time": record["_time"],
|
|
||||||
"value": record["_value"]
|
|
||||||
})
|
|
||||||
SCADA_dict[device_id] = records_list
|
|
||||||
client.close()
|
|
||||||
return SCADA_dict
|
|
||||||
|
|
||||||
|
|
||||||
# 2025/04/22
|
|
||||||
def query_cleaned_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"):
|
|
||||||
"""
|
|
||||||
查询指定时间范围内,多个SCADA设备的清洗完毕后的完整数据
|
|
||||||
:param query_ids_list: SCADA设备ID的列表
|
|
||||||
:param start_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。
|
|
||||||
:param end_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。
|
|
||||||
:param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
client = get_new_client()
|
|
||||||
if not client.ping():
|
|
||||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
|
||||||
|
|
||||||
query_api = client.query_api()
|
|
||||||
print('start_time', start_time)
|
|
||||||
print('end_time', end_time)
|
|
||||||
# 将北京时间转换为 UTC 时间
|
|
||||||
beijing_start_time = datetime.fromisoformat(start_time)
|
|
||||||
print('beijing_start_time', beijing_start_time)
|
|
||||||
utc_start_time = time_api.to_utc_time(beijing_start_time)
|
|
||||||
print('utc_start_time', utc_start_time)
|
|
||||||
beijing_end_time = datetime.fromisoformat(end_time)
|
|
||||||
print('beijing_end_time', beijing_end_time)
|
|
||||||
utc_stop_time = time_api.to_utc_time(beijing_end_time)
|
|
||||||
print('utc_stop_time', utc_stop_time)
|
|
||||||
SCADA_dict = {}
|
|
||||||
for device_id in query_ids_list:
|
|
||||||
flux_query = f'''
|
|
||||||
from(bucket: "{bucket}")
|
|
||||||
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|
|
||||||
|> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "cleaned_value")
|
|
||||||
|> sort(columns: ["_time"])
|
|
||||||
'''
|
|
||||||
# 执行查询,返回一个 FluxTable 列表
|
|
||||||
tables = query_api.query(flux_query)
|
|
||||||
print(tables)
|
|
||||||
records_list = []
|
|
||||||
for table in tables:
|
|
||||||
for record in table.records:
|
|
||||||
# 获取记录的时间和监测值
|
|
||||||
records_list.append({
|
|
||||||
"time": record["_time"],
|
|
||||||
"value": record["_value"]
|
|
||||||
})
|
|
||||||
SCADA_dict[device_id] = records_list
|
|
||||||
client.close()
|
|
||||||
return SCADA_dict
|
|
||||||
|
|
||||||
|
|
||||||
# 2025/02/01
|
# 2025/02/01
|
||||||
def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]],
|
def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]],
|
||||||
result_start_time: str, bucket: str = "realtime_simulation_result"):
|
result_start_time: str, bucket: str = "realtime_simulation_result"):
|
||||||
@@ -2123,8 +2028,7 @@ def query_all_records_by_date(query_date: str, bucket: str="realtime_simulation_
|
|||||||
client = get_new_client()
|
client = get_new_client()
|
||||||
# 记录开始时间
|
# 记录开始时间
|
||||||
time_cost_start = time.perf_counter()
|
time_cost_start = time.perf_counter()
|
||||||
print('{} -- Hydraulic simulation started.'.format(
|
print('{} -- query_all_records_by_date started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')))
|
|
||||||
|
|
||||||
if not client.ping():
|
if not client.ping():
|
||||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
@@ -3401,8 +3305,6 @@ def upload_cleaned_SCADA_data_to_influxdb(file_path: str, bucket: str="SCADA_dat
|
|||||||
datetime_value = datetime.strptime(row['time'], '%Y-%m-%d %H:%M:%S%z')
|
datetime_value = datetime.strptime(row['time'], '%Y-%m-%d %H:%M:%S%z')
|
||||||
# 处理datacleaning_value为空的情况
|
# 处理datacleaning_value为空的情况
|
||||||
datacleaning_value = float(row['datacleaning_value']) if row['datacleaning_value'] else None
|
datacleaning_value = float(row['datacleaning_value']) if row['datacleaning_value'] else None
|
||||||
datafilling_value = float(row['datafilling_value']) if row['datafilling_value'] else None
|
|
||||||
cleaned_value = float(row['cleaned_value']) if row['cleaned_value'] else None
|
|
||||||
# 处理monitored_value字段类型错误
|
# 处理monitored_value字段类型错误
|
||||||
try:
|
try:
|
||||||
monitored_value = float(row['monitored_value']) if row['monitored_value'] else None
|
monitored_value = float(row['monitored_value']) if row['monitored_value'] else None
|
||||||
@@ -3416,8 +3318,6 @@ def upload_cleaned_SCADA_data_to_influxdb(file_path: str, bucket: str="SCADA_dat
|
|||||||
'description': row['description'],
|
'description': row['description'],
|
||||||
'monitored_value': monitored_value,
|
'monitored_value': monitored_value,
|
||||||
'datacleaning_value': datacleaning_value,
|
'datacleaning_value': datacleaning_value,
|
||||||
'datafilling_value': datafilling_value,
|
|
||||||
'cleaned_value': cleaned_value,
|
|
||||||
'datetime': datetime_value
|
'datetime': datetime_value
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -3429,6 +3329,7 @@ def upload_cleaned_SCADA_data_to_influxdb(file_path: str, bucket: str="SCADA_dat
|
|||||||
write_api = client.write_api(write_options=SYNCHRONOUS)
|
write_api = client.write_api(write_options=SYNCHRONOUS)
|
||||||
# 写入数据
|
# 写入数据
|
||||||
for data in data_list:
|
for data in data_list:
|
||||||
|
print(data)
|
||||||
# 创建Point对象
|
# 创建Point对象
|
||||||
point = (
|
point = (
|
||||||
Point(data['measurement']) # measurement为mpointName
|
Point(data['measurement']) # measurement为mpointName
|
||||||
@@ -3437,8 +3338,6 @@ def upload_cleaned_SCADA_data_to_influxdb(file_path: str, bucket: str="SCADA_dat
|
|||||||
.tag('description', data['description'])
|
.tag('description', data['description'])
|
||||||
.field("monitored_value", data['monitored_value']) # field key为dataValue
|
.field("monitored_value", data['monitored_value']) # field key为dataValue
|
||||||
.field('datacleaning_value', data['datacleaning_value'])
|
.field('datacleaning_value', data['datacleaning_value'])
|
||||||
.field('datafilling_value', data['datafilling_value'])
|
|
||||||
.field('cleaned_value', data['cleaned_value'])
|
|
||||||
.time(data['datetime']) # 时间以datetime为准
|
.time(data['datetime']) # 时间以datetime为准
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -3462,11 +3361,11 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
client = InfluxDBClient(url=url, token=token)
|
client = InfluxDBClient(url=url, token=token)
|
||||||
# step1: 检查连接状态,初始化influxdb的buckets
|
# step1: 检查连接状态,初始化influxdb的buckets
|
||||||
# try:
|
try:
|
||||||
# # delete_buckets(org_name)
|
# delete_buckets(org_name)
|
||||||
# create_and_initialize_buckets(org_name)
|
create_and_initialize_buckets(org_name)
|
||||||
# except Exception as e:
|
except Exception as e:
|
||||||
# print(f"连接失败: {e}")
|
print(f"连接失败: {e}")
|
||||||
|
|
||||||
|
|
||||||
# step2: 先查询pg数据库中scada_info的信息,然后存储SCADA数据到SCADA_data这个bucket里
|
# step2: 先查询pg数据库中scada_info的信息,然后存储SCADA数据到SCADA_data这个bucket里
|
||||||
@@ -3485,7 +3384,7 @@ if __name__ == "__main__":
|
|||||||
# store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-03-08T12:00:00+08:00')
|
# store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-03-08T12:00:00+08:00')
|
||||||
|
|
||||||
# 示例3:download_history_data_manually
|
# 示例3:download_history_data_manually
|
||||||
# download_history_data_manually(begin_time='2025-04-17T00:00:00+08:00', end_time='2025-04-17T23:59:00+08:00')
|
# download_history_data_manually(begin_time='2025-04-16T00:00:00+08:00', end_time='2025-04-16T23:59:00+08:00')
|
||||||
|
|
||||||
# step3: 查询测试示例
|
# step3: 查询测试示例
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user