from typing import Any, List, Dict, Optional import logging from datetime import datetime, timedelta, timezone, time as dt_time import msgpack from fastapi import APIRouter from pydantic import BaseModel from py_linq import Enumerable import app.infra.db.influxdb.api as influxdb_api import app.services.time_api as time_api from app.infra.cache.redis_client import redis_client, encode_datetime, decode_datetime router = APIRouter() logger = logging.getLogger(__name__) # Basic Node/Link Latest Record Queries @router.get("/querynodelatestrecordbyid/") async def fastapi_query_node_latest_record_by_id(id: str) -> Any: return influxdb_api.query_latest_record_by_ID(id, type="node") @router.get("/querylinklatestrecordbyid/") async def fastapi_query_link_latest_record_by_id(id: str) -> Any: return influxdb_api.query_latest_record_by_ID(id, type="link") @router.get("/queryscadalatestrecordbyid/") async def fastapi_query_scada_latest_record_by_id(id: str) -> Any: return influxdb_api.query_latest_record_by_ID(id, type="scada") # Time-based Queries @router.get("/queryallrecordsbytime/") async def fastapi_query_all_records_by_time(querytime: str) -> dict[str, list]: results: tuple = influxdb_api.query_all_records_by_time(query_time=querytime) return {"nodes": results[0], "links": results[1]} @router.get("/queryallrecordsbytimeproperty/") async def fastapi_query_all_record_by_time_property( querytime: str, type: str, property: str, bucket: str = "realtime_simulation_result" ) -> dict[str, list]: results: tuple = influxdb_api.query_all_record_by_time_property( query_time=querytime, type=type, property=property, bucket=bucket ) return {"results": results} @router.get("/queryallschemerecordsbytimeproperty/") async def fastapi_query_all_scheme_record_by_time_property( querytime: str, type: str, property: str, schemename: str, bucket: str = "scheme_simulation_result", ) -> dict[str, list]: """ 查询指定方案某一时刻的所有记录,查询 'node' 或 'link' 的某一属性值 """ results: list = influxdb_api.query_all_scheme_record_by_time_property( query_time=querytime, type=type, property=property, scheme_name=schemename, bucket=bucket, ) return {"results": results} @router.get("/querysimulationrecordsbyidtime/") async def fastapi_query_simulation_record_by_ids_time( id: str, querytime: str, type: str, bucket: str = "realtime_simulation_result" ) -> dict[str, list]: results: tuple = influxdb_api.query_simulation_result_by_ID_time( ID=id, type=type, query_time=querytime, bucket=bucket ) return {"results": results} @router.get("/queryschemesimulationrecordsbyidtime/") async def fastapi_query_scheme_simulation_record_by_ids_time( scheme_name: str, id: str, querytime: str, type: str, bucket: str = "scheme_simulation_result", ) -> dict[str, list]: results: tuple = influxdb_api.query_scheme_simulation_result_by_ID_time( scheme_name=scheme_name, ID=id, type=type, query_time=querytime, bucket=bucket ) return {"results": results} # Date-based Queries with Caching @router.get("/queryallrecordsbydate/") async def fastapi_query_all_records_by_date(querydate: str) -> dict: is_today_or_future = time_api.is_today_or_future(querydate) logger.info(f"isToday or future: {is_today_or_future}") cache_key = f"queryallrecordsbydate_{querydate}" if not is_today_or_future: data = redis_client.get(cache_key) if data: results = msgpack.unpackb(data, object_hook=decode_datetime) logger.info("return from cache redis") return results logger.info("query from influxdb") nodes_links: tuple = influxdb_api.query_all_records_by_date(query_date=querydate) results = {"nodes": nodes_links[0], "links": nodes_links[1]} if not is_today_or_future: logger.info("save to cache redis") redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) logger.info("return results") return results @router.get("/queryallrecordsbytimerange/") async def fastapi_query_all_records_by_time_range( starttime: str, endtime: str ) -> dict[str, list]: cache_key = f"queryallrecordsbytimerange_{starttime}_{endtime}" if not time_api.is_today_or_future(starttime): data = redis_client.get(cache_key) if data: loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict nodes_links: tuple = influxdb_api.query_all_records_by_time_range( starttime=starttime, endtime=endtime ) results = {"nodes": nodes_links[0], "links": nodes_links[1]} if not time_api.is_today_or_future(starttime): redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @router.get("/queryallrecordsbydatewithtype/") async def fastapi_query_all_records_by_date_with_type( querydate: str, querytype: str ) -> list: cache_key = f"queryallrecordsbydatewithtype_{querydate}_{querytype}" data = redis_client.get(cache_key) if data: loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = influxdb_api.query_all_records_by_date_with_type( query_date=querydate, query_type=querytype ) packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) return results @router.get("/queryallrecordsbyidsdatetype/") async def fastapi_query_all_records_by_ids_date_type( ids: str, querydate: str, querytype: str ) -> list: cache_key = f"queryallrecordsbydatewithtype_{querydate}_{querytype}" data = redis_client.get(cache_key) results = [] if data: results = msgpack.unpackb(data, object_hook=decode_datetime) else: results = influxdb_api.query_all_records_by_date_with_type( query_date=querydate, query_type=querytype ) packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) query_ids = ids.split(",") # Using Enumerable from py_linq as in original code e_results = Enumerable(results) lst_results = e_results.where(lambda x: x["ID"] in query_ids).to_list() return lst_results @router.get("/queryallrecordsbydateproperty/") async def fastapi_query_all_records_by_date_property( querydate: str, querytype: str, property: str ) -> list[dict]: cache_key = f"queryallrecordsbydateproperty_{querydate}_{querytype}_{property}" data = redis_client.get(cache_key) if data: loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict result_dict = influxdb_api.query_all_record_by_date_property( query_date=querydate, type=querytype, property=property ) packed = msgpack.packb(result_dict, default=encode_datetime) redis_client.set(cache_key, packed) return result_dict # Curve Queries @router.get("/querynodecurvebyidpropertydaterange/") async def fastapi_query_node_curve_by_id_property_daterange( id: str, prop: str, startdate: str, enddate: str ): return influxdb_api.query_curve_by_ID_property_daterange( id, type="node", property=prop, start_date=startdate, end_date=enddate ) @router.get("/querylinkcurvebyidpropertydaterange/") async def fastapi_query_link_curve_by_id_property_daterange( id: str, prop: str, startdate: str, enddate: str ): return influxdb_api.query_curve_by_ID_property_daterange( id, type="link", property=prop, start_date=startdate, end_date=enddate ) # SCADA Data Queries @router.get("/queryscadadatabydeviceidandtime/") async def fastapi_query_scada_data_by_device_id_and_time(ids: str, querytime: str): query_ids = ids.split(",") logger.info(querytime) return influxdb_api.query_SCADA_data_by_device_ID_and_time( query_ids_list=query_ids, query_time=querytime ) @router.get("/queryscadadatabydeviceidandtimerange/") async def fastapi_query_scada_data_by_device_id_and_time_range( ids: str, starttime: str, endtime: str ): print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}") query_ids = ids.split(",") return influxdb_api.query_SCADA_data_by_device_ID_and_timerange( query_ids_list=query_ids, start_time=starttime, end_time=endtime ) @router.get("/queryfillingscadadatabydeviceidandtimerange/") async def fastapi_query_filling_scada_data_by_device_id_and_time_range( ids: str, starttime: str, endtime: str ): print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}") query_ids = ids.split(",") return influxdb_api.query_filling_SCADA_data_by_device_ID_and_timerange( query_ids_list=query_ids, start_time=starttime, end_time=endtime ) @router.get("/querycleaningscadadatabydeviceidandtimerange/") async def fastapi_query_cleaning_scada_data_by_device_id_and_time_range( ids: str, starttime: str, endtime: str ): print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}") query_ids = ids.split(",") return influxdb_api.query_cleaning_SCADA_data_by_device_ID_and_timerange( query_ids_list=query_ids, start_time=starttime, end_time=endtime ) @router.get("/querysimulationscadadatabydeviceidandtimerange/") async def fastapi_query_simulation_scada_data_by_device_id_and_time_range( ids: str, starttime: str, endtime: str ): print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}") query_ids = ids.split(",") return influxdb_api.query_simulation_SCADA_data_by_device_ID_and_timerange( query_ids_list=query_ids, start_time=starttime, end_time=endtime ) @router.get("/querycleanedscadadatabydeviceidandtimerange/") async def fastapi_query_cleaned_scada_data_by_device_id_and_time_range( ids: str, starttime: str, endtime: str ): print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}") query_ids = ids.split(",") return influxdb_api.query_cleaned_SCADA_data_by_device_ID_and_timerange( query_ids_list=query_ids, start_time=starttime, end_time=endtime ) @router.get("/queryscadadatabydeviceidanddate/") async def fastapi_query_scada_data_by_device_id_and_date(ids: str, querydate: str): query_ids = ids.split(",") return influxdb_api.query_SCADA_data_by_device_ID_and_date( query_ids_list=query_ids, query_date=querydate ) @router.get("/queryallscadarecordsbydate/") async def fastapi_query_all_scada_records_by_date(querydate: str): is_today_or_future = time_api.is_today_or_future(querydate) logger.info(f"isToday or future: {is_today_or_future}") cache_key = f"queryallscadarecordsbydate_{querydate}" if not is_today_or_future: data = redis_client.get(cache_key) if data: loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) logger.info("return from cache redis") return loaded_dict logger.info("query from influxdb") result_dict = influxdb_api.query_all_SCADA_records_by_date(query_date=querydate) if not is_today_or_future: logger.info("save to cache redis") packed = msgpack.packb(result_dict, default=encode_datetime) redis_client.set(cache_key, packed) logger.info("return results") return result_dict @router.get("/queryallschemeallrecords/") async def fastapi_query_all_scheme_all_records( schemetype: str, schemename: str, querydate: str ) -> tuple: cache_key = f"queryallschemeallrecords_{schemetype}_{schemename}_{querydate}" data = redis_client.get(cache_key) if data: loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = influxdb_api.query_scheme_all_record( scheme_Type=schemetype, scheme_name=schemename, query_date=querydate ) packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) return results @router.get("/queryschemeallrecordsproperty/") async def fastapi_query_all_scheme_all_records_property( schemetype: str, schemename: str, querydate: str, querytype: str, queryproperty: str ) -> Optional[List]: cache_key = f"queryallschemeallrecords_{schemetype}_{schemename}_{querydate}" data = redis_client.get(cache_key) all_results = None if data: all_results = msgpack.unpackb(data, object_hook=decode_datetime) else: all_results = influxdb_api.query_scheme_all_record( scheme_Type=schemetype, scheme_name=schemename, query_date=querydate ) packed = msgpack.packb(all_results, default=encode_datetime) redis_client.set(cache_key, packed) results = None if querytype == "node": results = all_results[0] elif querytype == "link": results = all_results[1] return results @router.get("/queryinfluxdbbuckets/") async def fastapi_query_influxdb_buckets(): return influxdb_api.query_buckets() @router.get("/queryinfluxdbbucketmeasurements/") async def fastapi_query_influxdb_bucket_measurements(bucket: str): return influxdb_api.query_measurements(bucket=bucket) ############################################################ # download history data ############################################################ class Download_History_Data_Manually(BaseModel): """ download_date:样式如 datetime(2025, 5, 4) """ download_date: datetime @router.post("/download_history_data_manually/") async def fastapi_download_history_data_manually( data: Download_History_Data_Manually, ) -> None: item = data.dict() tz = timezone(timedelta(hours=8)) begin_dt = datetime.combine(item.get("download_date").date(), dt_time.min).replace( tzinfo=tz ) end_dt = datetime.combine(item.get("download_date").date(), dt_time(23, 59, 59)).replace( tzinfo=tz ) begin_time = begin_dt.isoformat() end_time = end_dt.isoformat() influxdb_api.download_history_data_manually( begin_time=begin_time, end_time=end_time )