427 lines
18 KiB
Python
427 lines
18 KiB
Python
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
|
||
from typing import List, Dict
|
||
from datetime import datetime, timedelta, timezone
|
||
from dateutil import parser
|
||
|
||
# influxdb数据库连接信息
|
||
url = "http://localhost:8086" # 替换为你的InfluxDB实例地址
|
||
token = "xGDM5RZqRJAuzAGS-otXUdC2NFdY75qJAjRLqAB4p5WcIIAlIUpOpT8_yA16AOHmJWerwQ_08gwb84sy42jnZQ==" # 替换为你的InfluxDB Token
|
||
org_name = "TJWATEORG" # 替换为你的Organization名称
|
||
client = InfluxDBClient(url=url, token=token, org=org_name)
|
||
|
||
|
||
def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None:
|
||
|
||
bucket_api = BucketsApi(client)
|
||
write_api = client.write_api()
|
||
org_api = OrganizationsApi(client)
|
||
|
||
# 获取 org_id
|
||
org = next((o for o in org_api.find_organizations() if o.name == org_name), None)
|
||
if not org:
|
||
raise ValueError(f"Organization '{org_name}' not found.")
|
||
org_id = org.id
|
||
print(f"Using Organization ID: {org_id}")
|
||
|
||
# 定义 Buckets 信息
|
||
buckets = [
|
||
{"name": "SCADA_data", "retention_rules": []},
|
||
{"name": "realtime_data", "retention_rules": []},
|
||
{"name": "scheme_simulation", "retention_rules": []}
|
||
]
|
||
|
||
# 创建 Buckets 并初始化数据
|
||
for bucket in buckets:
|
||
# 创建 Bucket
|
||
created_bucket = bucket_api.create_bucket(
|
||
bucket_name=bucket["name"],
|
||
retention_rules=bucket["retention_rules"],
|
||
org_id=org_id
|
||
)
|
||
print(f"Bucket '{bucket['name']}' created with ID: {created_bucket.id}")
|
||
|
||
# 根据 Bucket 初始化数据
|
||
if bucket["name"] == "SCADA_data":
|
||
point = Point("SCADA") \
|
||
.tag("date", None) \
|
||
.tag("type", None) \
|
||
.tag("device_ID", None) \
|
||
.field("monitored_value", 0.0) \
|
||
.field("datacleaning_value", 0.0) \
|
||
.field("simulation_value", 0.0) \
|
||
.time("2024-11-21T00:00:00Z")
|
||
write_api.write(bucket="SCADA_data", org=org_name, record=point)
|
||
print("Initialized SCADA_data with default structure.")
|
||
|
||
elif bucket["name"] == "realtime_data":
|
||
link_point = Point("link") \
|
||
.tag("date", None) \
|
||
.tag("ID", None) \
|
||
.field("flow", 0.0) \
|
||
.field("leakage", 0.0) \
|
||
.field("velocity", 0.0) \
|
||
.field("headloss", 0.0) \
|
||
.field("status", None) \
|
||
.field("setting", 0.0) \
|
||
.field("quality", 0.0) \
|
||
.time("2024-11-21T00:00:00Z")
|
||
|
||
node_point = Point("node") \
|
||
.tag("date", None) \
|
||
.tag("ID", None) \
|
||
.field("head", 0.0) \
|
||
.field("pressure", 0.0) \
|
||
.field("actualdemand", 0.0) \
|
||
.field("demanddeficit", 0.0) \
|
||
.field("totalExternalOutflow", 0.0) \
|
||
.field("quality", 0.0) \
|
||
.time("2024-11-21T00:00:00Z")
|
||
|
||
write_api.write(bucket="realtime_data", org=org_name, record=link_point)
|
||
write_api.write(bucket="realtime_data", org=org_name, record=node_point)
|
||
print("Initialized realtime_data with default structure.")
|
||
|
||
elif bucket["name"] == "scheme_simulation":
|
||
link_point = Point("link") \
|
||
.tag("date", None) \
|
||
.tag("ID", None) \
|
||
.tag("scheme_Type", None) \
|
||
.tag("scheme_Name", None) \
|
||
.field("flow", 0.0) \
|
||
.field("leakage", 0.0) \
|
||
.field("velocity", 0.0) \
|
||
.field("headloss", 0.0) \
|
||
.field("status", None) \
|
||
.field("setting", 0.0) \
|
||
.field("quality", 0.0) \
|
||
.time("2024-11-21T00:00:00Z")
|
||
|
||
node_point = Point("node") \
|
||
.tag("date", None) \
|
||
.tag("ID", None) \
|
||
.tag("scheme_Type", None) \
|
||
.tag("scheme_Name", None) \
|
||
.field("head", 0.0) \
|
||
.field("pressure", 0.0) \
|
||
.field("actualdemand", 0.0) \
|
||
.field("demanddeficit", 0.0) \
|
||
.field("totalExternalOutflow", 0.0) \
|
||
.field("quality", 0.0) \
|
||
.time("2024-11-21T00:00:00Z")
|
||
|
||
write_api.write(bucket="scheme_simulation", org=org_name, record=link_point)
|
||
write_api.write(bucket="scheme_simulation", org=org_name, record=node_point)
|
||
print("Initialized scheme_simulation with default structure.")
|
||
|
||
|
||
print("All buckets created and initialized successfully.")
|
||
|
||
|
||
def store_realtime_data_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]],
|
||
result_start_time: str,
|
||
bucket: str = "realtime_data", client: InfluxDBClient = client):
|
||
"""
|
||
将实时数据存储到 InfluxDB 的realtime_data这个bucket中。
|
||
:param node_result_list: (List[Dict[str, any]]): 包含节点和结果数据的字典列表。
|
||
:param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。
|
||
:param result_start_time: (str): 计算结果的模拟开始时间。
|
||
:param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "realtime_data"。
|
||
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
|
||
:return:
|
||
"""
|
||
|
||
if client.ping():
|
||
print("Successfully connected to InfluxDB.")
|
||
else:
|
||
print("Failed to connect to InfluxDB.")
|
||
|
||
# 开始写入数据
|
||
try:
|
||
write_api = client.write_api()
|
||
date_str = result_start_time.split('T')[0]
|
||
time_beijing = datetime.strptime(result_start_time, '%Y-%m-%dT%H:%M:%S%z').isoformat()
|
||
for result in node_result_list:
|
||
# 提取节点信息和结果数据
|
||
node_id = result.get('node')
|
||
data_list = result.get('result', [])
|
||
|
||
for data in data_list:
|
||
# 构建 Point 数据,多个 field 存在于一个数据点中
|
||
node_point = Point("node") \
|
||
.tag("date", date_str) \
|
||
.tag("ID", node_id) \
|
||
.field("head", data.get('head', 0.0)) \
|
||
.field("pressure", data.get('pressure', 0.0)) \
|
||
.field("actualdemand", data.get('demand', 0.0)) \
|
||
.field("demanddeficit", None) \
|
||
.field("totalExternalOutflow", None) \
|
||
.field("quality", data.get('quality', 0.0)) \
|
||
.time(time_beijing)
|
||
|
||
# 写入数据到 InfluxDB,多个 field 在同一个 point 中
|
||
write_api.write(bucket=bucket, org=org_name, record=node_point)
|
||
print(f"成功将 {len(node_result_list)} 条node数据写入 InfluxDB。")
|
||
for result in link_result_list:
|
||
link_id = result.get('link')
|
||
data_list = result.get('result', [])
|
||
|
||
for data in data_list:
|
||
link_point = Point("link") \
|
||
.tag("date", date_str) \
|
||
.tag("ID", link_id) \
|
||
.field("flow", data.get('flow', 0.0)) \
|
||
.field("velocity", data.get('velocity', 0.0)) \
|
||
.field("headloss", data.get('headloss', 0.0)) \
|
||
.field("quality", data.get('quality', 0.0)) \
|
||
.field("status", data.get('status', "UNKNOWN")) \
|
||
.field("setting", data.get('setting', 0.0)) \
|
||
.field("reaction", data.get('reaction', 0.0)) \
|
||
.field("friction", data.get('friction', 0.0)) \
|
||
.time(time_beijing)
|
||
write_api.write(bucket=bucket, org=org_name, record=link_point)
|
||
print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。")
|
||
|
||
except Exception as e:
|
||
raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}")
|
||
|
||
|
||
def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> dict:
|
||
"""
|
||
查询指定ID的最新的一条记录
|
||
:param ID: (str): 要查询的 ID。
|
||
:param type: (str): "node"或“link”
|
||
:param bucket: (str): 数据存储的 bucket 名称。
|
||
:param client: (InfluxDBClient): 已初始化的 InfluxDB 客户端实例。
|
||
:return: dict: 最新记录的数据,如果没有找到则返回 None。
|
||
"""
|
||
if client.ping():
|
||
print("Successfully connected to InfluxDB.")
|
||
else:
|
||
print("Failed to connect to InfluxDB.")
|
||
query_api = client.query_api()
|
||
if type == "node":
|
||
flux_query = f'''
|
||
from(bucket: "{bucket}")
|
||
|> range(start: -30d) // 查找最近一月的记录
|
||
|> filter(fn: (r) => r["_measurement"] == "node")
|
||
|> filter(fn: (r) => r["ID"] == "{ID}")
|
||
|> pivot(
|
||
rowKey:["_time"],
|
||
columnKey:["_field"],
|
||
valueColumn:"_value"
|
||
)
|
||
|> sort(columns: ["_time"], desc: true)
|
||
|> limit(n: 1)
|
||
'''
|
||
tables = query_api.query(flux_query)
|
||
# 解析查询结果
|
||
for table in tables:
|
||
for record in table.records:
|
||
|
||
return {
|
||
"time": record["_time"],
|
||
"nodeID": ID,
|
||
"head": record["head"],
|
||
"pressure": record["pressure"],
|
||
"actualdemand": record["actualdemand"],
|
||
# "demanddeficit": record["demanddeficit"],
|
||
# "totalExternalOutflow": record["totalExternalOutflow"],
|
||
"quality": record["quality"]
|
||
}
|
||
elif type == "link":
|
||
flux_query = f'''
|
||
from(bucket: "{bucket}")
|
||
|> range(start: -30d) // 查找最近一月的记录
|
||
|> filter(fn: (r) => r["_measurement"] == "link")
|
||
|> filter(fn: (r) => r["ID"] == "{ID}")
|
||
|> pivot(
|
||
rowKey:["_time"],
|
||
columnKey:["_field"],
|
||
valueColumn:"_value"
|
||
)
|
||
|> sort(columns: ["_time"], desc: true)
|
||
|> limit(n: 1)
|
||
'''
|
||
tables = query_api.query(flux_query)
|
||
# 解析查询结果
|
||
for table in tables:
|
||
for record in table.records:
|
||
return {
|
||
"time": record["_time"],
|
||
"linkID": ID,
|
||
"flow": record["flow"],
|
||
"velocity": record["velocity"],
|
||
"headloss": record["headloss"],
|
||
"quality": record["quality"],
|
||
"status": record["status"],
|
||
"setting": record["setting"],
|
||
"reaction": record["reaction"],
|
||
"friction": record["friction"]
|
||
}
|
||
return None # 如果没有找到记录
|
||
|
||
|
||
def query_all_record_by_time(query_time: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> tuple:
|
||
"""
|
||
查询指定北京时间的所有记录,包括 'node' 和 'link' measurement,分别以指定格式返回。
|
||
:param query_time: (str): 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。
|
||
:param bucket: (str): 数据存储的 bucket 名称。
|
||
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
|
||
:return: dict: tuple: (node_records, link_records)
|
||
"""
|
||
if client.ping():
|
||
print("Successfully connected to InfluxDB.")
|
||
else:
|
||
print("Failed to connect to InfluxDB.")
|
||
query_api = client.query_api()
|
||
# 将北京时间转换为 UTC 时间
|
||
beijing_time = datetime.fromisoformat(query_time)
|
||
utc_time = beijing_time.astimezone(timezone.utc)
|
||
utc_start_time = utc_time - timedelta(seconds=1)
|
||
utc_stop_time = utc_time + timedelta(seconds=1)
|
||
|
||
# 构建 Flux 查询语句
|
||
flux_query = f'''
|
||
from(bucket: "{bucket}")
|
||
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|
||
|> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link")
|
||
|> pivot(
|
||
rowKey:["_time"],
|
||
columnKey:["_field"],
|
||
valueColumn:"_value"
|
||
)
|
||
'''
|
||
|
||
# 执行查询
|
||
tables = query_api.query(flux_query)
|
||
node_records = []
|
||
link_records = []
|
||
# 解析查询结果
|
||
|
||
for table in tables:
|
||
for record in table.records:
|
||
# print(record.values) # 打印完整记录内容
|
||
measurement = record["_measurement"]
|
||
# 处理 node 数据
|
||
if measurement == "node":
|
||
node_records.append({
|
||
"time": record["_time"],
|
||
"ID": record["ID"],
|
||
"head": record["head"],
|
||
"pressure": record["pressure"],
|
||
"actualdemand": record["actualdemand"],
|
||
"quality": record["quality"]
|
||
})
|
||
|
||
# 处理 link 数据
|
||
elif measurement == "link":
|
||
link_records.append({
|
||
"time": record["_time"],
|
||
"linkID": record["ID"],
|
||
"flow": record["flow"],
|
||
"velocity": record["velocity"],
|
||
"headloss": record["headloss"],
|
||
"quality": record["quality"],
|
||
"status": record["status"],
|
||
"setting": record["setting"],
|
||
"reaction": record["reaction"],
|
||
"friction": record["friction"]
|
||
})
|
||
|
||
return node_records, link_records
|
||
|
||
# DingZQ return list of dict { 'time': 'timevalue', 'value', 'valuevalue' }
|
||
def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> list:
|
||
"""
|
||
根据 type 查询对应的 measurement,根据 ID 和 date 查询对应的 tag,根据 property 查询对应的 field。
|
||
:param ID: (str): 要查询的 ID(tag)
|
||
:param type: (str): 查询的类型(决定 measurement)
|
||
:param property: (str): 查询的字段名称(field)
|
||
:param start_date: (str): 查询的开始日期,格式为 'YYYY-MM-DD'
|
||
:param end_date: (str): 查询的结束日期,格式为 'YYYY-MM-DD'
|
||
:param bucket: (str): 数据存储的 bucket 名称,默认值为 "realtime_data"
|
||
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例
|
||
:return: 查询结果的列表
|
||
"""
|
||
if client.ping():
|
||
print("Successfully connected to InfluxDB.")
|
||
else:
|
||
print("Failed to connect to InfluxDB.")
|
||
query_api = client.query_api()
|
||
# 确定 measurement
|
||
if type == "node":
|
||
measurement = "node"
|
||
elif type == "link":
|
||
measurement = "link"
|
||
else:
|
||
raise ValueError(f"不支持的类型: {type}")
|
||
|
||
# 解析日期范围(当天的 UTC 开始和结束时间)
|
||
# previous_day = datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)
|
||
# start_time = previous_day.isoformat() + "T16:00:00Z"
|
||
# stop_time = datetime.strptime(end_date, "%Y-%m-%d").isoformat() + "T15:59:59Z"
|
||
|
||
# 将 start_date 的北京时间转换为 UTC 时间范围
|
||
start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
|
||
stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat()
|
||
|
||
# 构建 Flux 查询语句
|
||
flux_query = f'''
|
||
from(bucket: "{bucket}")
|
||
|> range(start: {start_time}, stop: {stop_time})
|
||
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|
||
|> filter(fn: (r) => r["ID"] == "{ID}")
|
||
|> filter(fn: (r) => r["_field"] == "{property}")
|
||
'''
|
||
|
||
# 执行查询
|
||
tables = query_api.query(flux_query)
|
||
|
||
# 解析查询结果
|
||
results = []
|
||
for table in tables:
|
||
for record in table.records:
|
||
results.append({
|
||
"time": record["_time"],
|
||
"value": record["_value"]
|
||
})
|
||
|
||
return results
|
||
|
||
# 示例调用
|
||
if __name__ == "__main__":
|
||
url = "http://localhost:8086" # 替换为你的InfluxDB实例地址
|
||
token = "Z4UZj9HuLwLlwoApywvT2nGVP3bwLy18y-sJQ7enzZlJd8YMzMWbBA6F-q4gBiZ-7-IqdxR5aR9LvicKiSNmnA==" # 替换为你的InfluxDB Token
|
||
org_name = "beibei" # 替换为你的Organization名称
|
||
|
||
# client = InfluxDBClient(url=url, token=token)
|
||
|
||
# # 检查连接状态
|
||
# try:
|
||
# create_and_initialize_buckets(client, org_name)
|
||
# except Exception as e:
|
||
# print(f"连接失败: {e}")
|
||
# finally:
|
||
# client.close()
|
||
with InfluxDBClient(url=url, token=token, org=org_name) as client:
|
||
|
||
bucket_name = "realtime_data" # 数据存储的 bucket 名称
|
||
node_id = "ZBBDTZDP000022" # 查询的节点 ID
|
||
link_id = "ZBBGXSZW000002"
|
||
#
|
||
# # latest_record = query_latest_record_by_ID(ID=node_id, type="node", bucket=bucket_name, client=client)
|
||
# latest_record = query_latest_record_by_ID(ID=link_id, type="link", bucket=bucket_name, client=client)
|
||
#
|
||
# if latest_record:
|
||
# print("最新记录:", latest_record)
|
||
# else:
|
||
# print("未找到符合条件的记录。")
|
||
|
||
# node_records, link_records = query_all_record_by_time(query_time="2024-11-25T06:00:00+08:00")
|
||
# print("Node 数据:", node_records)
|
||
# print("Link 数据:", link_records)
|
||
|
||
curve_result = query_curve_by_ID_property_daterange(ID=node_id, type="node", property="head",
|
||
start_date="2024-11-25", end_date="2024-11-25")
|
||
print(curve_result)
|