Files
TJWaterServer/influxdb_api.py
2025-01-30 21:39:40 +08:00

427 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
from typing import List, Dict
from datetime import datetime, timedelta, timezone
from dateutil import parser
# influxdb数据库连接信息
url = "http://localhost:8086" # 替换为你的InfluxDB实例地址
token = "xGDM5RZqRJAuzAGS-otXUdC2NFdY75qJAjRLqAB4p5WcIIAlIUpOpT8_yA16AOHmJWerwQ_08gwb84sy42jnZQ==" # 替换为你的InfluxDB Token
org_name = "TJWATEORG" # 替换为你的Organization名称
client = InfluxDBClient(url=url, token=token, org=org_name)
def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None:
bucket_api = BucketsApi(client)
write_api = client.write_api()
org_api = OrganizationsApi(client)
# 获取 org_id
org = next((o for o in org_api.find_organizations() if o.name == org_name), None)
if not org:
raise ValueError(f"Organization '{org_name}' not found.")
org_id = org.id
print(f"Using Organization ID: {org_id}")
# 定义 Buckets 信息
buckets = [
{"name": "SCADA_data", "retention_rules": []},
{"name": "realtime_data", "retention_rules": []},
{"name": "scheme_simulation", "retention_rules": []}
]
# 创建 Buckets 并初始化数据
for bucket in buckets:
# 创建 Bucket
created_bucket = bucket_api.create_bucket(
bucket_name=bucket["name"],
retention_rules=bucket["retention_rules"],
org_id=org_id
)
print(f"Bucket '{bucket['name']}' created with ID: {created_bucket.id}")
# 根据 Bucket 初始化数据
if bucket["name"] == "SCADA_data":
point = Point("SCADA") \
.tag("date", None) \
.tag("type", None) \
.tag("device_ID", None) \
.field("monitored_value", 0.0) \
.field("datacleaning_value", 0.0) \
.field("simulation_value", 0.0) \
.time("2024-11-21T00:00:00Z")
write_api.write(bucket="SCADA_data", org=org_name, record=point)
print("Initialized SCADA_data with default structure.")
elif bucket["name"] == "realtime_data":
link_point = Point("link") \
.tag("date", None) \
.tag("ID", None) \
.field("flow", 0.0) \
.field("leakage", 0.0) \
.field("velocity", 0.0) \
.field("headloss", 0.0) \
.field("status", None) \
.field("setting", 0.0) \
.field("quality", 0.0) \
.time("2024-11-21T00:00:00Z")
node_point = Point("node") \
.tag("date", None) \
.tag("ID", None) \
.field("head", 0.0) \
.field("pressure", 0.0) \
.field("actualdemand", 0.0) \
.field("demanddeficit", 0.0) \
.field("totalExternalOutflow", 0.0) \
.field("quality", 0.0) \
.time("2024-11-21T00:00:00Z")
write_api.write(bucket="realtime_data", org=org_name, record=link_point)
write_api.write(bucket="realtime_data", org=org_name, record=node_point)
print("Initialized realtime_data with default structure.")
elif bucket["name"] == "scheme_simulation":
link_point = Point("link") \
.tag("date", None) \
.tag("ID", None) \
.tag("scheme_Type", None) \
.tag("scheme_Name", None) \
.field("flow", 0.0) \
.field("leakage", 0.0) \
.field("velocity", 0.0) \
.field("headloss", 0.0) \
.field("status", None) \
.field("setting", 0.0) \
.field("quality", 0.0) \
.time("2024-11-21T00:00:00Z")
node_point = Point("node") \
.tag("date", None) \
.tag("ID", None) \
.tag("scheme_Type", None) \
.tag("scheme_Name", None) \
.field("head", 0.0) \
.field("pressure", 0.0) \
.field("actualdemand", 0.0) \
.field("demanddeficit", 0.0) \
.field("totalExternalOutflow", 0.0) \
.field("quality", 0.0) \
.time("2024-11-21T00:00:00Z")
write_api.write(bucket="scheme_simulation", org=org_name, record=link_point)
write_api.write(bucket="scheme_simulation", org=org_name, record=node_point)
print("Initialized scheme_simulation with default structure.")
print("All buckets created and initialized successfully.")
def store_realtime_data_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]],
result_start_time: str,
bucket: str = "realtime_data", client: InfluxDBClient = client):
"""
将实时数据存储到 InfluxDB 的realtime_data这个bucket中。
:param node_result_list: (List[Dict[str, any]]): 包含节点和结果数据的字典列表。
:param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。
:param result_start_time: (str): 计算结果的模拟开始时间。
:param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "realtime_data"
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:return:
"""
if client.ping():
print("Successfully connected to InfluxDB.")
else:
print("Failed to connect to InfluxDB.")
# 开始写入数据
try:
write_api = client.write_api()
date_str = result_start_time.split('T')[0]
time_beijing = datetime.strptime(result_start_time, '%Y-%m-%dT%H:%M:%S%z').isoformat()
for result in node_result_list:
# 提取节点信息和结果数据
node_id = result.get('node')
data_list = result.get('result', [])
for data in data_list:
# 构建 Point 数据,多个 field 存在于一个数据点中
node_point = Point("node") \
.tag("date", date_str) \
.tag("ID", node_id) \
.field("head", data.get('head', 0.0)) \
.field("pressure", data.get('pressure', 0.0)) \
.field("actualdemand", data.get('demand', 0.0)) \
.field("demanddeficit", None) \
.field("totalExternalOutflow", None) \
.field("quality", data.get('quality', 0.0)) \
.time(time_beijing)
# 写入数据到 InfluxDB多个 field 在同一个 point 中
write_api.write(bucket=bucket, org=org_name, record=node_point)
print(f"成功将 {len(node_result_list)} 条node数据写入 InfluxDB。")
for result in link_result_list:
link_id = result.get('link')
data_list = result.get('result', [])
for data in data_list:
link_point = Point("link") \
.tag("date", date_str) \
.tag("ID", link_id) \
.field("flow", data.get('flow', 0.0)) \
.field("velocity", data.get('velocity', 0.0)) \
.field("headloss", data.get('headloss', 0.0)) \
.field("quality", data.get('quality', 0.0)) \
.field("status", data.get('status', "UNKNOWN")) \
.field("setting", data.get('setting', 0.0)) \
.field("reaction", data.get('reaction', 0.0)) \
.field("friction", data.get('friction', 0.0)) \
.time(time_beijing)
write_api.write(bucket=bucket, org=org_name, record=link_point)
print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。")
except Exception as e:
raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}")
def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> dict:
"""
查询指定ID的最新的一条记录
:param ID: (str): 要查询的 ID。
:param type: (str): "node"或“link”
:param bucket: (str): 数据存储的 bucket 名称。
:param client: (InfluxDBClient): 已初始化的 InfluxDB 客户端实例。
:return: dict: 最新记录的数据,如果没有找到则返回 None。
"""
if client.ping():
print("Successfully connected to InfluxDB.")
else:
print("Failed to connect to InfluxDB.")
query_api = client.query_api()
if type == "node":
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: -30d) // 查找最近一月的记录
|> filter(fn: (r) => r["_measurement"] == "node")
|> filter(fn: (r) => r["ID"] == "{ID}")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
|> sort(columns: ["_time"], desc: true)
|> limit(n: 1)
'''
tables = query_api.query(flux_query)
# 解析查询结果
for table in tables:
for record in table.records:
return {
"time": record["_time"],
"nodeID": ID,
"head": record["head"],
"pressure": record["pressure"],
"actualdemand": record["actualdemand"],
# "demanddeficit": record["demanddeficit"],
# "totalExternalOutflow": record["totalExternalOutflow"],
"quality": record["quality"]
}
elif type == "link":
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: -30d) // 查找最近一月的记录
|> filter(fn: (r) => r["_measurement"] == "link")
|> filter(fn: (r) => r["ID"] == "{ID}")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
|> sort(columns: ["_time"], desc: true)
|> limit(n: 1)
'''
tables = query_api.query(flux_query)
# 解析查询结果
for table in tables:
for record in table.records:
return {
"time": record["_time"],
"linkID": ID,
"flow": record["flow"],
"velocity": record["velocity"],
"headloss": record["headloss"],
"quality": record["quality"],
"status": record["status"],
"setting": record["setting"],
"reaction": record["reaction"],
"friction": record["friction"]
}
return None # 如果没有找到记录
def query_all_record_by_time(query_time: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> tuple:
"""
查询指定北京时间的所有记录,包括 'node''link' measurement分别以指定格式返回。
:param query_time: (str): 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'
:param bucket: (str): 数据存储的 bucket 名称。
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:return: dict: tuple: (node_records, link_records)
"""
if client.ping():
print("Successfully connected to InfluxDB.")
else:
print("Failed to connect to InfluxDB.")
query_api = client.query_api()
# 将北京时间转换为 UTC 时间
beijing_time = datetime.fromisoformat(query_time)
utc_time = beijing_time.astimezone(timezone.utc)
utc_start_time = utc_time - timedelta(seconds=1)
utc_stop_time = utc_time + timedelta(seconds=1)
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
'''
# 执行查询
tables = query_api.query(flux_query)
node_records = []
link_records = []
# 解析查询结果
for table in tables:
for record in table.records:
# print(record.values) # 打印完整记录内容
measurement = record["_measurement"]
# 处理 node 数据
if measurement == "node":
node_records.append({
"time": record["_time"],
"ID": record["ID"],
"head": record["head"],
"pressure": record["pressure"],
"actualdemand": record["actualdemand"],
"quality": record["quality"]
})
# 处理 link 数据
elif measurement == "link":
link_records.append({
"time": record["_time"],
"linkID": record["ID"],
"flow": record["flow"],
"velocity": record["velocity"],
"headloss": record["headloss"],
"quality": record["quality"],
"status": record["status"],
"setting": record["setting"],
"reaction": record["reaction"],
"friction": record["friction"]
})
return node_records, link_records
# DingZQ return list of dict { 'time': 'timevalue', 'value', 'valuevalue' }
def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> list:
"""
根据 type 查询对应的 measurement根据 ID 和 date 查询对应的 tag根据 property 查询对应的 field。
:param ID: (str): 要查询的 IDtag
:param type: (str): 查询的类型(决定 measurement
:param property: (str): 查询的字段名称field
:param start_date: (str): 查询的开始日期,格式为 'YYYY-MM-DD'
:param end_date: (str): 查询的结束日期,格式为 'YYYY-MM-DD'
:param bucket: (str): 数据存储的 bucket 名称,默认值为 "realtime_data"
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例
:return: 查询结果的列表
"""
if client.ping():
print("Successfully connected to InfluxDB.")
else:
print("Failed to connect to InfluxDB.")
query_api = client.query_api()
# 确定 measurement
if type == "node":
measurement = "node"
elif type == "link":
measurement = "link"
else:
raise ValueError(f"不支持的类型: {type}")
# 解析日期范围(当天的 UTC 开始和结束时间)
# previous_day = datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)
# start_time = previous_day.isoformat() + "T16:00:00Z"
# stop_time = datetime.strptime(end_date, "%Y-%m-%d").isoformat() + "T15:59:59Z"
# 将 start_date 的北京时间转换为 UTC 时间范围
start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat()
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["ID"] == "{ID}")
|> filter(fn: (r) => r["_field"] == "{property}")
'''
# 执行查询
tables = query_api.query(flux_query)
# 解析查询结果
results = []
for table in tables:
for record in table.records:
results.append({
"time": record["_time"],
"value": record["_value"]
})
return results
# 示例调用
if __name__ == "__main__":
url = "http://localhost:8086" # 替换为你的InfluxDB实例地址
token = "Z4UZj9HuLwLlwoApywvT2nGVP3bwLy18y-sJQ7enzZlJd8YMzMWbBA6F-q4gBiZ-7-IqdxR5aR9LvicKiSNmnA==" # 替换为你的InfluxDB Token
org_name = "beibei" # 替换为你的Organization名称
# client = InfluxDBClient(url=url, token=token)
# # 检查连接状态
# try:
# create_and_initialize_buckets(client, org_name)
# except Exception as e:
# print(f"连接失败: {e}")
# finally:
# client.close()
with InfluxDBClient(url=url, token=token, org=org_name) as client:
bucket_name = "realtime_data" # 数据存储的 bucket 名称
node_id = "ZBBDTZDP000022" # 查询的节点 ID
link_id = "ZBBGXSZW000002"
#
# # latest_record = query_latest_record_by_ID(ID=node_id, type="node", bucket=bucket_name, client=client)
# latest_record = query_latest_record_by_ID(ID=link_id, type="link", bucket=bucket_name, client=client)
#
# if latest_record:
# print("最新记录:", latest_record)
# else:
# print("未找到符合条件的记录。")
# node_records, link_records = query_all_record_by_time(query_time="2024-11-25T06:00:00+08:00")
# print("Node 数据:", node_records)
# print("Link 数据:", link_records)
curve_result = query_curve_by_ID_property_daterange(ID=node_id, type="node", property="head",
start_date="2024-11-25", end_date="2024-11-25")
print(curve_result)