diff --git a/InfluxDb Information.txt b/InfluxDb Information.txt new file mode 100644 index 0000000..9bcfa4c --- /dev/null +++ b/InfluxDb Information.txt @@ -0,0 +1,14 @@ +UserName tjwater +Password Tjwater@123456 +Organizatio TJWATEORG +Bucket TJWATERBUCKET + +API Token : xGDM5RZqRJAuzAGS-otXUdC2NFdY75qJAjRLqAB4p5WcIIAlIUpOpT8_yA16AOHmJWerwQ_08gwb84sy42jnZQ== + + + +influx config create --config-name onboarding ` + --host-url "http://localhost:8086" ` + --org "TJWATERORG" ` + --token "p4Hq6DQ4xI6yA2tZQgo-VGzjWObylyWd4B45vMoiae0XJeNUlL87FdEUU5cJ63O87W7-nAhhGWl_0FGJiL801w==" ` + --active diff --git a/influxdb_api.py b/influxdb_api.py new file mode 100644 index 0000000..e41dc5e --- /dev/null +++ b/influxdb_api.py @@ -0,0 +1,426 @@ +from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi +from typing import List, Dict +from datetime import datetime, timedelta, timezone +from dateutil import parser + +# influxdb数据库连接信息 +url = "http://localhost:8086" # 替换为你的InfluxDB实例地址 +token = "xGDM5RZqRJAuzAGS-otXUdC2NFdY75qJAjRLqAB4p5WcIIAlIUpOpT8_yA16AOHmJWerwQ_08gwb84sy42jnZQ==" # 替换为你的InfluxDB Token +org_name = "TJWATEORG" # 替换为你的Organization名称 +client = InfluxDBClient(url=url, token=token, org=org_name) + + +def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None: + + bucket_api = BucketsApi(client) + write_api = client.write_api() + org_api = OrganizationsApi(client) + + # 获取 org_id + org = next((o for o in org_api.find_organizations() if o.name == org_name), None) + if not org: + raise ValueError(f"Organization '{org_name}' not found.") + org_id = org.id + print(f"Using Organization ID: {org_id}") + + # 定义 Buckets 信息 + buckets = [ + {"name": "SCADA_data", "retention_rules": []}, + {"name": "realtime_data", "retention_rules": []}, + {"name": "scheme_simulation", "retention_rules": []} + ] + + # 创建 Buckets 并初始化数据 + for bucket in buckets: + # 创建 Bucket + created_bucket = bucket_api.create_bucket( + bucket_name=bucket["name"], + retention_rules=bucket["retention_rules"], + org_id=org_id + ) + print(f"Bucket '{bucket['name']}' created with ID: {created_bucket.id}") + + # 根据 Bucket 初始化数据 + if bucket["name"] == "SCADA_data": + point = Point("SCADA") \ + .tag("date", None) \ + .tag("type", None) \ + .tag("device_ID", None) \ + .field("monitored_value", 0.0) \ + .field("datacleaning_value", 0.0) \ + .field("simulation_value", 0.0) \ + .time("2024-11-21T00:00:00Z") + write_api.write(bucket="SCADA_data", org=org_name, record=point) + print("Initialized SCADA_data with default structure.") + + elif bucket["name"] == "realtime_data": + link_point = Point("link") \ + .tag("date", None) \ + .tag("ID", None) \ + .field("flow", 0.0) \ + .field("leakage", 0.0) \ + .field("velocity", 0.0) \ + .field("headloss", 0.0) \ + .field("status", None) \ + .field("setting", 0.0) \ + .field("quality", 0.0) \ + .time("2024-11-21T00:00:00Z") + + node_point = Point("node") \ + .tag("date", None) \ + .tag("ID", None) \ + .field("head", 0.0) \ + .field("pressure", 0.0) \ + .field("actualdemand", 0.0) \ + .field("demanddeficit", 0.0) \ + .field("totalExternalOutflow", 0.0) \ + .field("quality", 0.0) \ + .time("2024-11-21T00:00:00Z") + + write_api.write(bucket="realtime_data", org=org_name, record=link_point) + write_api.write(bucket="realtime_data", org=org_name, record=node_point) + print("Initialized realtime_data with default structure.") + + elif bucket["name"] == "scheme_simulation": + link_point = Point("link") \ + .tag("date", None) \ + .tag("ID", None) \ + .tag("scheme_Type", None) \ + .tag("scheme_Name", None) \ + .field("flow", 0.0) \ + .field("leakage", 0.0) \ + .field("velocity", 0.0) \ + .field("headloss", 0.0) \ + .field("status", None) \ + .field("setting", 0.0) \ + .field("quality", 0.0) \ + .time("2024-11-21T00:00:00Z") + + node_point = Point("node") \ + .tag("date", None) \ + .tag("ID", None) \ + .tag("scheme_Type", None) \ + .tag("scheme_Name", None) \ + .field("head", 0.0) \ + .field("pressure", 0.0) \ + .field("actualdemand", 0.0) \ + .field("demanddeficit", 0.0) \ + .field("totalExternalOutflow", 0.0) \ + .field("quality", 0.0) \ + .time("2024-11-21T00:00:00Z") + + write_api.write(bucket="scheme_simulation", org=org_name, record=link_point) + write_api.write(bucket="scheme_simulation", org=org_name, record=node_point) + print("Initialized scheme_simulation with default structure.") + + + print("All buckets created and initialized successfully.") + + +def store_realtime_data_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], + result_start_time: str, + bucket: str = "realtime_data", client: InfluxDBClient = client): + """ + 将实时数据存储到 InfluxDB 的realtime_data这个bucket中。 + :param node_result_list: (List[Dict[str, any]]): 包含节点和结果数据的字典列表。 + :param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。 + :param result_start_time: (str): 计算结果的模拟开始时间。 + :param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "realtime_data"。 + :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 + :return: + """ + + if client.ping(): + print("Successfully connected to InfluxDB.") + else: + print("Failed to connect to InfluxDB.") + + # 开始写入数据 + try: + write_api = client.write_api() + date_str = result_start_time.split('T')[0] + time_beijing = datetime.strptime(result_start_time, '%Y-%m-%dT%H:%M:%S%z').isoformat() + for result in node_result_list: + # 提取节点信息和结果数据 + node_id = result.get('node') + data_list = result.get('result', []) + + for data in data_list: + # 构建 Point 数据,多个 field 存在于一个数据点中 + node_point = Point("node") \ + .tag("date", date_str) \ + .tag("ID", node_id) \ + .field("head", data.get('head', 0.0)) \ + .field("pressure", data.get('pressure', 0.0)) \ + .field("actualdemand", data.get('demand', 0.0)) \ + .field("demanddeficit", None) \ + .field("totalExternalOutflow", None) \ + .field("quality", data.get('quality', 0.0)) \ + .time(time_beijing) + + # 写入数据到 InfluxDB,多个 field 在同一个 point 中 + write_api.write(bucket=bucket, org=org_name, record=node_point) + print(f"成功将 {len(node_result_list)} 条node数据写入 InfluxDB。") + for result in link_result_list: + link_id = result.get('link') + data_list = result.get('result', []) + + for data in data_list: + link_point = Point("link") \ + .tag("date", date_str) \ + .tag("ID", link_id) \ + .field("flow", data.get('flow', 0.0)) \ + .field("velocity", data.get('velocity', 0.0)) \ + .field("headloss", data.get('headloss', 0.0)) \ + .field("quality", data.get('quality', 0.0)) \ + .field("status", data.get('status', "UNKNOWN")) \ + .field("setting", data.get('setting', 0.0)) \ + .field("reaction", data.get('reaction', 0.0)) \ + .field("friction", data.get('friction', 0.0)) \ + .time(time_beijing) + write_api.write(bucket=bucket, org=org_name, record=link_point) + print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。") + + except Exception as e: + raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") + + +def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> dict: + """ + 查询指定ID的最新的一条记录 + :param ID: (str): 要查询的 ID。 + :param type: (str): "node"或“link” + :param bucket: (str): 数据存储的 bucket 名称。 + :param client: (InfluxDBClient): 已初始化的 InfluxDB 客户端实例。 + :return: dict: 最新记录的数据,如果没有找到则返回 None。 + """ + if client.ping(): + print("Successfully connected to InfluxDB.") + else: + print("Failed to connect to InfluxDB.") + query_api = client.query_api() + if type == "node": + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: -30d) // 查找最近一月的记录 + |> filter(fn: (r) => r["_measurement"] == "node") + |> filter(fn: (r) => r["ID"] == "{ID}") + |> pivot( + rowKey:["_time"], + columnKey:["_field"], + valueColumn:"_value" + ) + |> sort(columns: ["_time"], desc: true) + |> limit(n: 1) + ''' + tables = query_api.query(flux_query) + # 解析查询结果 + for table in tables: + for record in table.records: + + return { + "time": record["_time"], + "nodeID": ID, + "head": record["head"], + "pressure": record["pressure"], + "actualdemand": record["actualdemand"], + # "demanddeficit": record["demanddeficit"], + # "totalExternalOutflow": record["totalExternalOutflow"], + "quality": record["quality"] + } + elif type == "link": + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: -30d) // 查找最近一月的记录 + |> filter(fn: (r) => r["_measurement"] == "link") + |> filter(fn: (r) => r["ID"] == "{ID}") + |> pivot( + rowKey:["_time"], + columnKey:["_field"], + valueColumn:"_value" + ) + |> sort(columns: ["_time"], desc: true) + |> limit(n: 1) + ''' + tables = query_api.query(flux_query) + # 解析查询结果 + for table in tables: + for record in table.records: + return { + "time": record["_time"], + "linkID": ID, + "flow": record["flow"], + "velocity": record["velocity"], + "headloss": record["headloss"], + "quality": record["quality"], + "status": record["status"], + "setting": record["setting"], + "reaction": record["reaction"], + "friction": record["friction"] + } + return None # 如果没有找到记录 + + +def query_all_record_by_time(query_time: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> tuple: + """ + 查询指定北京时间的所有记录,包括 'node' 和 'link' measurement,分别以指定格式返回。 + :param query_time: (str): 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param bucket: (str): 数据存储的 bucket 名称。 + :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 + :return: dict: tuple: (node_records, link_records) + """ + if client.ping(): + print("Successfully connected to InfluxDB.") + else: + print("Failed to connect to InfluxDB.") + query_api = client.query_api() + # 将北京时间转换为 UTC 时间 + beijing_time = datetime.fromisoformat(query_time) + utc_time = beijing_time.astimezone(timezone.utc) + utc_start_time = utc_time - timedelta(seconds=1) + utc_stop_time = utc_time + timedelta(seconds=1) + + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link") + |> pivot( + rowKey:["_time"], + columnKey:["_field"], + valueColumn:"_value" + ) + ''' + + # 执行查询 + tables = query_api.query(flux_query) + node_records = [] + link_records = [] + # 解析查询结果 + + for table in tables: + for record in table.records: + # print(record.values) # 打印完整记录内容 + measurement = record["_measurement"] + # 处理 node 数据 + if measurement == "node": + node_records.append({ + "time": record["_time"], + "ID": record["ID"], + "head": record["head"], + "pressure": record["pressure"], + "actualdemand": record["actualdemand"], + "quality": record["quality"] + }) + + # 处理 link 数据 + elif measurement == "link": + link_records.append({ + "time": record["_time"], + "linkID": record["ID"], + "flow": record["flow"], + "velocity": record["velocity"], + "headloss": record["headloss"], + "quality": record["quality"], + "status": record["status"], + "setting": record["setting"], + "reaction": record["reaction"], + "friction": record["friction"] + }) + + return node_records, link_records + + +def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> list: + """ + 根据 type 查询对应的 measurement,根据 ID 和 date 查询对应的 tag,根据 property 查询对应的 field。 + :param ID: (str): 要查询的 ID(tag) + :param type: (str): 查询的类型(决定 measurement) + :param property: (str): 查询的字段名称(field) + :param start_date: (str): 查询的开始日期,格式为 'YYYY-MM-DD' + :param end_date: (str): 查询的结束日期,格式为 'YYYY-MM-DD' + :param bucket: (str): 数据存储的 bucket 名称,默认值为 "realtime_data" + :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例 + :return: 查询结果的列表 + """ + if client.ping(): + print("Successfully connected to InfluxDB.") + else: + print("Failed to connect to InfluxDB.") + query_api = client.query_api() + # 确定 measurement + if type == "node": + measurement = "node" + elif type == "link": + measurement = "link" + else: + raise ValueError(f"不支持的类型: {type}") + + # 解析日期范围(当天的 UTC 开始和结束时间) + # previous_day = datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1) + # start_time = previous_day.isoformat() + "T16:00:00Z" + # stop_time = datetime.strptime(end_date, "%Y-%m-%d").isoformat() + "T15:59:59Z" + + # 将 start_date 的北京时间转换为 UTC 时间范围 + start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "{measurement}") + |> filter(fn: (r) => r["ID"] == "{ID}") + |> filter(fn: (r) => r["_field"] == "{property}") + ''' + + # 执行查询 + tables = query_api.query(flux_query) + + # 解析查询结果 + results = [] + for table in tables: + for record in table.records: + results.append({ + "time": record["_time"], + "value": record["_value"] + }) + + return results + +# 示例调用 +if __name__ == "__main__": + url = "http://localhost:8086" # 替换为你的InfluxDB实例地址 + token = "Z4UZj9HuLwLlwoApywvT2nGVP3bwLy18y-sJQ7enzZlJd8YMzMWbBA6F-q4gBiZ-7-IqdxR5aR9LvicKiSNmnA==" # 替换为你的InfluxDB Token + org_name = "beibei" # 替换为你的Organization名称 + + # client = InfluxDBClient(url=url, token=token) + + # # 检查连接状态 + # try: + # create_and_initialize_buckets(client, org_name) + # except Exception as e: + # print(f"连接失败: {e}") + # finally: + # client.close() + with InfluxDBClient(url=url, token=token, org=org_name) as client: + + bucket_name = "realtime_data" # 数据存储的 bucket 名称 + node_id = "ZBBDTZDP000022" # 查询的节点 ID + link_id = "ZBBGXSZW000002" + # + # # latest_record = query_latest_record_by_ID(ID=node_id, type="node", bucket=bucket_name, client=client) + # latest_record = query_latest_record_by_ID(ID=link_id, type="link", bucket=bucket_name, client=client) + # + # if latest_record: + # print("最新记录:", latest_record) + # else: + # print("未找到符合条件的记录。") + + # node_records, link_records = query_all_record_by_time(query_time="2024-11-25T06:00:00+08:00") + # print("Node 数据:", node_records) + # print("Link 数据:", link_records) + + curve_result = query_curve_by_ID_property_daterange(ID=node_id, type="node", property="head", + start_date="2024-11-25", end_date="2024-11-25") + print(curve_result)