from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi from typing import List, Dict from datetime import datetime, timedelta, timezone from dateutil import parser # influxdb数据库连接信息 url = "http://localhost:8086" # 替换为你的InfluxDB实例地址 token = "xGDM5RZqRJAuzAGS-otXUdC2NFdY75qJAjRLqAB4p5WcIIAlIUpOpT8_yA16AOHmJWerwQ_08gwb84sy42jnZQ==" # 替换为你的InfluxDB Token org_name = "TJWATEORG" # 替换为你的Organization名称 client = InfluxDBClient(url=url, token=token, org=org_name) def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None: bucket_api = BucketsApi(client) write_api = client.write_api() org_api = OrganizationsApi(client) # 获取 org_id org = next((o for o in org_api.find_organizations() if o.name == org_name), None) if not org: raise ValueError(f"Organization '{org_name}' not found.") org_id = org.id print(f"Using Organization ID: {org_id}") # 定义 Buckets 信息 buckets = [ {"name": "SCADA_data", "retention_rules": []}, {"name": "realtime_data", "retention_rules": []}, {"name": "scheme_simulation", "retention_rules": []} ] # 创建 Buckets 并初始化数据 for bucket in buckets: # 创建 Bucket created_bucket = bucket_api.create_bucket( bucket_name=bucket["name"], retention_rules=bucket["retention_rules"], org_id=org_id ) print(f"Bucket '{bucket['name']}' created with ID: {created_bucket.id}") # 根据 Bucket 初始化数据 if bucket["name"] == "SCADA_data": point = Point("SCADA") \ .tag("date", None) \ .tag("type", None) \ .tag("device_ID", None) \ .field("monitored_value", 0.0) \ .field("datacleaning_value", 0.0) \ .field("simulation_value", 0.0) \ .time("2024-11-21T00:00:00Z") write_api.write(bucket="SCADA_data", org=org_name, record=point) print("Initialized SCADA_data with default structure.") elif bucket["name"] == "realtime_data": link_point = Point("link") \ .tag("date", None) \ .tag("ID", None) \ .field("flow", 0.0) \ .field("leakage", 0.0) \ .field("velocity", 0.0) \ .field("headloss", 0.0) \ .field("status", None) \ .field("setting", 0.0) \ .field("quality", 0.0) \ .time("2024-11-21T00:00:00Z") node_point = Point("node") \ .tag("date", None) \ .tag("ID", None) \ .field("head", 0.0) \ .field("pressure", 0.0) \ .field("actualdemand", 0.0) \ .field("demanddeficit", 0.0) \ .field("totalExternalOutflow", 0.0) \ .field("quality", 0.0) \ .time("2024-11-21T00:00:00Z") write_api.write(bucket="realtime_data", org=org_name, record=link_point) write_api.write(bucket="realtime_data", org=org_name, record=node_point) print("Initialized realtime_data with default structure.") elif bucket["name"] == "scheme_simulation": link_point = Point("link") \ .tag("date", None) \ .tag("ID", None) \ .tag("scheme_Type", None) \ .tag("scheme_Name", None) \ .field("flow", 0.0) \ .field("leakage", 0.0) \ .field("velocity", 0.0) \ .field("headloss", 0.0) \ .field("status", None) \ .field("setting", 0.0) \ .field("quality", 0.0) \ .time("2024-11-21T00:00:00Z") node_point = Point("node") \ .tag("date", None) \ .tag("ID", None) \ .tag("scheme_Type", None) \ .tag("scheme_Name", None) \ .field("head", 0.0) \ .field("pressure", 0.0) \ .field("actualdemand", 0.0) \ .field("demanddeficit", 0.0) \ .field("totalExternalOutflow", 0.0) \ .field("quality", 0.0) \ .time("2024-11-21T00:00:00Z") write_api.write(bucket="scheme_simulation", org=org_name, record=link_point) write_api.write(bucket="scheme_simulation", org=org_name, record=node_point) print("Initialized scheme_simulation with default structure.") print("All buckets created and initialized successfully.") def store_realtime_data_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], result_start_time: str, bucket: str = "realtime_data", client: InfluxDBClient = client): """ 将实时数据存储到 InfluxDB 的realtime_data这个bucket中。 :param node_result_list: (List[Dict[str, any]]): 包含节点和结果数据的字典列表。 :param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。 :param result_start_time: (str): 计算结果的模拟开始时间。 :param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "realtime_data"。 :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 :return: """ if client.ping(): print("Successfully connected to InfluxDB.") else: print("Failed to connect to InfluxDB.") # 开始写入数据 try: write_api = client.write_api() date_str = result_start_time.split('T')[0] time_beijing = datetime.strptime(result_start_time, '%Y-%m-%dT%H:%M:%S%z').isoformat() for result in node_result_list: # 提取节点信息和结果数据 node_id = result.get('node') data_list = result.get('result', []) for data in data_list: # 构建 Point 数据,多个 field 存在于一个数据点中 node_point = Point("node") \ .tag("date", date_str) \ .tag("ID", node_id) \ .field("head", data.get('head', 0.0)) \ .field("pressure", data.get('pressure', 0.0)) \ .field("actualdemand", data.get('demand', 0.0)) \ .field("demanddeficit", None) \ .field("totalExternalOutflow", None) \ .field("quality", data.get('quality', 0.0)) \ .time(time_beijing) # 写入数据到 InfluxDB,多个 field 在同一个 point 中 write_api.write(bucket=bucket, org=org_name, record=node_point) print(f"成功将 {len(node_result_list)} 条node数据写入 InfluxDB。") for result in link_result_list: link_id = result.get('link') data_list = result.get('result', []) for data in data_list: link_point = Point("link") \ .tag("date", date_str) \ .tag("ID", link_id) \ .field("flow", data.get('flow', 0.0)) \ .field("velocity", data.get('velocity', 0.0)) \ .field("headloss", data.get('headloss', 0.0)) \ .field("quality", data.get('quality', 0.0)) \ .field("status", data.get('status', "UNKNOWN")) \ .field("setting", data.get('setting', 0.0)) \ .field("reaction", data.get('reaction', 0.0)) \ .field("friction", data.get('friction', 0.0)) \ .time(time_beijing) write_api.write(bucket=bucket, org=org_name, record=link_point) print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。") except Exception as e: raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> dict: """ 查询指定ID的最新的一条记录 :param ID: (str): 要查询的 ID。 :param type: (str): "node"或“link” :param bucket: (str): 数据存储的 bucket 名称。 :param client: (InfluxDBClient): 已初始化的 InfluxDB 客户端实例。 :return: dict: 最新记录的数据,如果没有找到则返回 None。 """ if client.ping(): print("Successfully connected to InfluxDB.") else: print("Failed to connect to InfluxDB.") query_api = client.query_api() if type == "node": flux_query = f''' from(bucket: "{bucket}") |> range(start: -30d) // 查找最近一月的记录 |> filter(fn: (r) => r["_measurement"] == "node") |> filter(fn: (r) => r["ID"] == "{ID}") |> pivot( rowKey:["_time"], columnKey:["_field"], valueColumn:"_value" ) |> sort(columns: ["_time"], desc: true) |> limit(n: 1) ''' tables = query_api.query(flux_query) # 解析查询结果 for table in tables: for record in table.records: return { "time": record["_time"], "nodeID": ID, "head": record["head"], "pressure": record["pressure"], "actualdemand": record["actualdemand"], # "demanddeficit": record["demanddeficit"], # "totalExternalOutflow": record["totalExternalOutflow"], "quality": record["quality"] } elif type == "link": flux_query = f''' from(bucket: "{bucket}") |> range(start: -30d) // 查找最近一月的记录 |> filter(fn: (r) => r["_measurement"] == "link") |> filter(fn: (r) => r["ID"] == "{ID}") |> pivot( rowKey:["_time"], columnKey:["_field"], valueColumn:"_value" ) |> sort(columns: ["_time"], desc: true) |> limit(n: 1) ''' tables = query_api.query(flux_query) # 解析查询结果 for table in tables: for record in table.records: return { "time": record["_time"], "linkID": ID, "flow": record["flow"], "velocity": record["velocity"], "headloss": record["headloss"], "quality": record["quality"], "status": record["status"], "setting": record["setting"], "reaction": record["reaction"], "friction": record["friction"] } return None # 如果没有找到记录 def query_all_record_by_time(query_time: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> tuple: """ 查询指定北京时间的所有记录,包括 'node' 和 'link' measurement,分别以指定格式返回。 :param query_time: (str): 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 :param bucket: (str): 数据存储的 bucket 名称。 :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 :return: dict: tuple: (node_records, link_records) """ if client.ping(): print("Successfully connected to InfluxDB.") else: print("Failed to connect to InfluxDB.") query_api = client.query_api() # 将北京时间转换为 UTC 时间 beijing_time = datetime.fromisoformat(query_time) utc_time = beijing_time.astimezone(timezone.utc) utc_start_time = utc_time - timedelta(seconds=1) utc_stop_time = utc_time + timedelta(seconds=1) # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link") |> pivot( rowKey:["_time"], columnKey:["_field"], valueColumn:"_value" ) ''' # 执行查询 tables = query_api.query(flux_query) node_records = [] link_records = [] # 解析查询结果 for table in tables: for record in table.records: # print(record.values) # 打印完整记录内容 measurement = record["_measurement"] # 处理 node 数据 if measurement == "node": node_records.append({ "time": record["_time"], "ID": record["ID"], "head": record["head"], "pressure": record["pressure"], "actualdemand": record["actualdemand"], "quality": record["quality"] }) # 处理 link 数据 elif measurement == "link": link_records.append({ "time": record["_time"], "linkID": record["ID"], "flow": record["flow"], "velocity": record["velocity"], "headloss": record["headloss"], "quality": record["quality"], "status": record["status"], "setting": record["setting"], "reaction": record["reaction"], "friction": record["friction"] }) return node_records, link_records # DingZQ return list of dict { 'time': 'timevalue', 'value', 'valuevalue' } def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> list: """ 根据 type 查询对应的 measurement,根据 ID 和 date 查询对应的 tag,根据 property 查询对应的 field。 :param ID: (str): 要查询的 ID(tag) :param type: (str): 查询的类型(决定 measurement) :param property: (str): 查询的字段名称(field) :param start_date: (str): 查询的开始日期,格式为 'YYYY-MM-DD' :param end_date: (str): 查询的结束日期,格式为 'YYYY-MM-DD' :param bucket: (str): 数据存储的 bucket 名称,默认值为 "realtime_data" :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例 :return: 查询结果的列表 """ if client.ping(): print("Successfully connected to InfluxDB.") else: print("Failed to connect to InfluxDB.") query_api = client.query_api() # 确定 measurement if type == "node": measurement = "node" elif type == "link": measurement = "link" else: raise ValueError(f"不支持的类型: {type}") # 解析日期范围(当天的 UTC 开始和结束时间) # previous_day = datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1) # start_time = previous_day.isoformat() + "T16:00:00Z" # stop_time = datetime.strptime(end_date, "%Y-%m-%d").isoformat() + "T15:59:59Z" # 将 start_date 的北京时间转换为 UTC 时间范围 start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") |> range(start: {start_time}, stop: {stop_time}) |> filter(fn: (r) => r["_measurement"] == "{measurement}") |> filter(fn: (r) => r["ID"] == "{ID}") |> filter(fn: (r) => r["_field"] == "{property}") ''' # 执行查询 tables = query_api.query(flux_query) # 解析查询结果 results = [] for table in tables: for record in table.records: results.append({ "time": record["_time"], "value": record["_value"] }) return results # 示例调用 if __name__ == "__main__": url = "http://localhost:8086" # 替换为你的InfluxDB实例地址 token = "Z4UZj9HuLwLlwoApywvT2nGVP3bwLy18y-sJQ7enzZlJd8YMzMWbBA6F-q4gBiZ-7-IqdxR5aR9LvicKiSNmnA==" # 替换为你的InfluxDB Token org_name = "beibei" # 替换为你的Organization名称 # client = InfluxDBClient(url=url, token=token) # # 检查连接状态 # try: # create_and_initialize_buckets(client, org_name) # except Exception as e: # print(f"连接失败: {e}") # finally: # client.close() with InfluxDBClient(url=url, token=token, org=org_name) as client: bucket_name = "realtime_data" # 数据存储的 bucket 名称 node_id = "ZBBDTZDP000022" # 查询的节点 ID link_id = "ZBBGXSZW000002" # # # latest_record = query_latest_record_by_ID(ID=node_id, type="node", bucket=bucket_name, client=client) # latest_record = query_latest_record_by_ID(ID=link_id, type="link", bucket=bucket_name, client=client) # # if latest_record: # print("最新记录:", latest_record) # else: # print("未找到符合条件的记录。") # node_records, link_records = query_all_record_by_time(query_time="2024-11-25T06:00:00+08:00") # print("Node 数据:", node_records) # print("Link 数据:", link_records) curve_result = query_curve_by_ID_property_daterange(ID=node_id, type="node", property="head", start_date="2024-11-25", end_date="2024-11-25") print(curve_result)