Add influx api

This commit is contained in:
DingZQ
2024-12-09 23:28:47 +08:00
parent c7d51769e1
commit 41e80212e1
2 changed files with 440 additions and 0 deletions

426
influxdb_api.py Normal file
View File

@@ -0,0 +1,426 @@
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
from typing import List, Dict
from datetime import datetime, timedelta, timezone
from dateutil import parser
# influxdb数据库连接信息
url = "http://localhost:8086" # 替换为你的InfluxDB实例地址
token = "xGDM5RZqRJAuzAGS-otXUdC2NFdY75qJAjRLqAB4p5WcIIAlIUpOpT8_yA16AOHmJWerwQ_08gwb84sy42jnZQ==" # 替换为你的InfluxDB Token
org_name = "TJWATEORG" # 替换为你的Organization名称
client = InfluxDBClient(url=url, token=token, org=org_name)
def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None:
bucket_api = BucketsApi(client)
write_api = client.write_api()
org_api = OrganizationsApi(client)
# 获取 org_id
org = next((o for o in org_api.find_organizations() if o.name == org_name), None)
if not org:
raise ValueError(f"Organization '{org_name}' not found.")
org_id = org.id
print(f"Using Organization ID: {org_id}")
# 定义 Buckets 信息
buckets = [
{"name": "SCADA_data", "retention_rules": []},
{"name": "realtime_data", "retention_rules": []},
{"name": "scheme_simulation", "retention_rules": []}
]
# 创建 Buckets 并初始化数据
for bucket in buckets:
# 创建 Bucket
created_bucket = bucket_api.create_bucket(
bucket_name=bucket["name"],
retention_rules=bucket["retention_rules"],
org_id=org_id
)
print(f"Bucket '{bucket['name']}' created with ID: {created_bucket.id}")
# 根据 Bucket 初始化数据
if bucket["name"] == "SCADA_data":
point = Point("SCADA") \
.tag("date", None) \
.tag("type", None) \
.tag("device_ID", None) \
.field("monitored_value", 0.0) \
.field("datacleaning_value", 0.0) \
.field("simulation_value", 0.0) \
.time("2024-11-21T00:00:00Z")
write_api.write(bucket="SCADA_data", org=org_name, record=point)
print("Initialized SCADA_data with default structure.")
elif bucket["name"] == "realtime_data":
link_point = Point("link") \
.tag("date", None) \
.tag("ID", None) \
.field("flow", 0.0) \
.field("leakage", 0.0) \
.field("velocity", 0.0) \
.field("headloss", 0.0) \
.field("status", None) \
.field("setting", 0.0) \
.field("quality", 0.0) \
.time("2024-11-21T00:00:00Z")
node_point = Point("node") \
.tag("date", None) \
.tag("ID", None) \
.field("head", 0.0) \
.field("pressure", 0.0) \
.field("actualdemand", 0.0) \
.field("demanddeficit", 0.0) \
.field("totalExternalOutflow", 0.0) \
.field("quality", 0.0) \
.time("2024-11-21T00:00:00Z")
write_api.write(bucket="realtime_data", org=org_name, record=link_point)
write_api.write(bucket="realtime_data", org=org_name, record=node_point)
print("Initialized realtime_data with default structure.")
elif bucket["name"] == "scheme_simulation":
link_point = Point("link") \
.tag("date", None) \
.tag("ID", None) \
.tag("scheme_Type", None) \
.tag("scheme_Name", None) \
.field("flow", 0.0) \
.field("leakage", 0.0) \
.field("velocity", 0.0) \
.field("headloss", 0.0) \
.field("status", None) \
.field("setting", 0.0) \
.field("quality", 0.0) \
.time("2024-11-21T00:00:00Z")
node_point = Point("node") \
.tag("date", None) \
.tag("ID", None) \
.tag("scheme_Type", None) \
.tag("scheme_Name", None) \
.field("head", 0.0) \
.field("pressure", 0.0) \
.field("actualdemand", 0.0) \
.field("demanddeficit", 0.0) \
.field("totalExternalOutflow", 0.0) \
.field("quality", 0.0) \
.time("2024-11-21T00:00:00Z")
write_api.write(bucket="scheme_simulation", org=org_name, record=link_point)
write_api.write(bucket="scheme_simulation", org=org_name, record=node_point)
print("Initialized scheme_simulation with default structure.")
print("All buckets created and initialized successfully.")
def store_realtime_data_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]],
result_start_time: str,
bucket: str = "realtime_data", client: InfluxDBClient = client):
"""
将实时数据存储到 InfluxDB 的realtime_data这个bucket中。
:param node_result_list: (List[Dict[str, any]]): 包含节点和结果数据的字典列表。
:param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。
:param result_start_time: (str): 计算结果的模拟开始时间。
:param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "realtime_data"
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:return:
"""
if client.ping():
print("Successfully connected to InfluxDB.")
else:
print("Failed to connect to InfluxDB.")
# 开始写入数据
try:
write_api = client.write_api()
date_str = result_start_time.split('T')[0]
time_beijing = datetime.strptime(result_start_time, '%Y-%m-%dT%H:%M:%S%z').isoformat()
for result in node_result_list:
# 提取节点信息和结果数据
node_id = result.get('node')
data_list = result.get('result', [])
for data in data_list:
# 构建 Point 数据,多个 field 存在于一个数据点中
node_point = Point("node") \
.tag("date", date_str) \
.tag("ID", node_id) \
.field("head", data.get('head', 0.0)) \
.field("pressure", data.get('pressure', 0.0)) \
.field("actualdemand", data.get('demand', 0.0)) \
.field("demanddeficit", None) \
.field("totalExternalOutflow", None) \
.field("quality", data.get('quality', 0.0)) \
.time(time_beijing)
# 写入数据到 InfluxDB多个 field 在同一个 point 中
write_api.write(bucket=bucket, org=org_name, record=node_point)
print(f"成功将 {len(node_result_list)} 条node数据写入 InfluxDB。")
for result in link_result_list:
link_id = result.get('link')
data_list = result.get('result', [])
for data in data_list:
link_point = Point("link") \
.tag("date", date_str) \
.tag("ID", link_id) \
.field("flow", data.get('flow', 0.0)) \
.field("velocity", data.get('velocity', 0.0)) \
.field("headloss", data.get('headloss', 0.0)) \
.field("quality", data.get('quality', 0.0)) \
.field("status", data.get('status', "UNKNOWN")) \
.field("setting", data.get('setting', 0.0)) \
.field("reaction", data.get('reaction', 0.0)) \
.field("friction", data.get('friction', 0.0)) \
.time(time_beijing)
write_api.write(bucket=bucket, org=org_name, record=link_point)
print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。")
except Exception as e:
raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}")
def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> dict:
"""
查询指定ID的最新的一条记录
:param ID: (str): 要查询的 ID。
:param type: (str): "node"或“link”
:param bucket: (str): 数据存储的 bucket 名称。
:param client: (InfluxDBClient): 已初始化的 InfluxDB 客户端实例。
:return: dict: 最新记录的数据,如果没有找到则返回 None。
"""
if client.ping():
print("Successfully connected to InfluxDB.")
else:
print("Failed to connect to InfluxDB.")
query_api = client.query_api()
if type == "node":
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: -30d) // 查找最近一月的记录
|> filter(fn: (r) => r["_measurement"] == "node")
|> filter(fn: (r) => r["ID"] == "{ID}")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
|> sort(columns: ["_time"], desc: true)
|> limit(n: 1)
'''
tables = query_api.query(flux_query)
# 解析查询结果
for table in tables:
for record in table.records:
return {
"time": record["_time"],
"nodeID": ID,
"head": record["head"],
"pressure": record["pressure"],
"actualdemand": record["actualdemand"],
# "demanddeficit": record["demanddeficit"],
# "totalExternalOutflow": record["totalExternalOutflow"],
"quality": record["quality"]
}
elif type == "link":
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: -30d) // 查找最近一月的记录
|> filter(fn: (r) => r["_measurement"] == "link")
|> filter(fn: (r) => r["ID"] == "{ID}")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
|> sort(columns: ["_time"], desc: true)
|> limit(n: 1)
'''
tables = query_api.query(flux_query)
# 解析查询结果
for table in tables:
for record in table.records:
return {
"time": record["_time"],
"linkID": ID,
"flow": record["flow"],
"velocity": record["velocity"],
"headloss": record["headloss"],
"quality": record["quality"],
"status": record["status"],
"setting": record["setting"],
"reaction": record["reaction"],
"friction": record["friction"]
}
return None # 如果没有找到记录
def query_all_record_by_time(query_time: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> tuple:
"""
查询指定北京时间的所有记录,包括 'node''link' measurement分别以指定格式返回。
:param query_time: (str): 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'
:param bucket: (str): 数据存储的 bucket 名称。
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:return: dict: tuple: (node_records, link_records)
"""
if client.ping():
print("Successfully connected to InfluxDB.")
else:
print("Failed to connect to InfluxDB.")
query_api = client.query_api()
# 将北京时间转换为 UTC 时间
beijing_time = datetime.fromisoformat(query_time)
utc_time = beijing_time.astimezone(timezone.utc)
utc_start_time = utc_time - timedelta(seconds=1)
utc_stop_time = utc_time + timedelta(seconds=1)
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
'''
# 执行查询
tables = query_api.query(flux_query)
node_records = []
link_records = []
# 解析查询结果
for table in tables:
for record in table.records:
# print(record.values) # 打印完整记录内容
measurement = record["_measurement"]
# 处理 node 数据
if measurement == "node":
node_records.append({
"time": record["_time"],
"ID": record["ID"],
"head": record["head"],
"pressure": record["pressure"],
"actualdemand": record["actualdemand"],
"quality": record["quality"]
})
# 处理 link 数据
elif measurement == "link":
link_records.append({
"time": record["_time"],
"linkID": record["ID"],
"flow": record["flow"],
"velocity": record["velocity"],
"headloss": record["headloss"],
"quality": record["quality"],
"status": record["status"],
"setting": record["setting"],
"reaction": record["reaction"],
"friction": record["friction"]
})
return node_records, link_records
def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> list:
"""
根据 type 查询对应的 measurement根据 ID 和 date 查询对应的 tag根据 property 查询对应的 field。
:param ID: (str): 要查询的 IDtag
:param type: (str): 查询的类型(决定 measurement
:param property: (str): 查询的字段名称field
:param start_date: (str): 查询的开始日期,格式为 'YYYY-MM-DD'
:param end_date: (str): 查询的结束日期,格式为 'YYYY-MM-DD'
:param bucket: (str): 数据存储的 bucket 名称,默认值为 "realtime_data"
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例
:return: 查询结果的列表
"""
if client.ping():
print("Successfully connected to InfluxDB.")
else:
print("Failed to connect to InfluxDB.")
query_api = client.query_api()
# 确定 measurement
if type == "node":
measurement = "node"
elif type == "link":
measurement = "link"
else:
raise ValueError(f"不支持的类型: {type}")
# 解析日期范围(当天的 UTC 开始和结束时间)
# previous_day = datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)
# start_time = previous_day.isoformat() + "T16:00:00Z"
# stop_time = datetime.strptime(end_date, "%Y-%m-%d").isoformat() + "T15:59:59Z"
# 将 start_date 的北京时间转换为 UTC 时间范围
start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat()
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["ID"] == "{ID}")
|> filter(fn: (r) => r["_field"] == "{property}")
'''
# 执行查询
tables = query_api.query(flux_query)
# 解析查询结果
results = []
for table in tables:
for record in table.records:
results.append({
"time": record["_time"],
"value": record["_value"]
})
return results
# 示例调用
if __name__ == "__main__":
url = "http://localhost:8086" # 替换为你的InfluxDB实例地址
token = "Z4UZj9HuLwLlwoApywvT2nGVP3bwLy18y-sJQ7enzZlJd8YMzMWbBA6F-q4gBiZ-7-IqdxR5aR9LvicKiSNmnA==" # 替换为你的InfluxDB Token
org_name = "beibei" # 替换为你的Organization名称
# client = InfluxDBClient(url=url, token=token)
# # 检查连接状态
# try:
# create_and_initialize_buckets(client, org_name)
# except Exception as e:
# print(f"连接失败: {e}")
# finally:
# client.close()
with InfluxDBClient(url=url, token=token, org=org_name) as client:
bucket_name = "realtime_data" # 数据存储的 bucket 名称
node_id = "ZBBDTZDP000022" # 查询的节点 ID
link_id = "ZBBGXSZW000002"
#
# # latest_record = query_latest_record_by_ID(ID=node_id, type="node", bucket=bucket_name, client=client)
# latest_record = query_latest_record_by_ID(ID=link_id, type="link", bucket=bucket_name, client=client)
#
# if latest_record:
# print("最新记录:", latest_record)
# else:
# print("未找到符合条件的记录。")
# node_records, link_records = query_all_record_by_time(query_time="2024-11-25T06:00:00+08:00")
# print("Node 数据:", node_records)
# print("Link 数据:", link_records)
curve_result = query_curve_by_ID_property_daterange(ID=node_id, type="node", property="head",
start_date="2024-11-25", end_date="2024-11-25")
print(curve_result)