89 lines
2.9 KiB
Python
89 lines
2.9 KiB
Python
import os
|
||
|
||
import pandas as pd
|
||
|
||
|
||
def fill_time_gaps(
|
||
data: pd.DataFrame,
|
||
time_col: str = "time",
|
||
freq: str = "1min",
|
||
short_gap_threshold: int = 10,
|
||
) -> pd.DataFrame:
|
||
"""
|
||
补齐缺失时间戳并填补数据缺口。
|
||
|
||
Args:
|
||
data: 包含时间列的 DataFrame
|
||
time_col: 时间列名(默认 'time')
|
||
freq: 重采样频率(默认 '1min')
|
||
short_gap_threshold: 短缺口阈值(分钟),<=此值用线性插值,>此值用前向填充
|
||
|
||
Returns:
|
||
补齐时间后的 DataFrame(保留原时间列格式)
|
||
"""
|
||
if time_col not in data.columns:
|
||
raise ValueError(f"时间列 '{time_col}' 不存在于数据中")
|
||
|
||
# 解析时间列并设为索引
|
||
data = data.copy()
|
||
data[time_col] = pd.to_datetime(data[time_col], utc=True)
|
||
data_indexed = data.set_index(time_col)
|
||
|
||
# 生成完整时间范围
|
||
full_range = pd.date_range(
|
||
start=data_indexed.index.min(), end=data_indexed.index.max(), freq=freq
|
||
)
|
||
|
||
# 重索引以补齐缺失时间点,同时保留原始时间戳
|
||
combined_index = data_indexed.index.union(full_range).sort_values().unique()
|
||
data_reindexed = data_indexed.reindex(combined_index)
|
||
|
||
# 按列处理缺口
|
||
for col in data_reindexed.columns:
|
||
# 识别缺失值位置
|
||
is_missing = data_reindexed[col].isna()
|
||
|
||
# 计算连续缺失的长度
|
||
missing_groups = (is_missing != is_missing.shift()).cumsum()
|
||
gap_lengths = is_missing.groupby(missing_groups).transform("sum")
|
||
|
||
# 短缺口:时间插值
|
||
short_gap_mask = is_missing & (gap_lengths <= short_gap_threshold)
|
||
if short_gap_mask.any():
|
||
data_reindexed.loc[short_gap_mask, col] = (
|
||
data_reindexed[col]
|
||
.interpolate(method="time", limit_area="inside")
|
||
.loc[short_gap_mask]
|
||
)
|
||
|
||
# 长缺口:前向填充
|
||
long_gap_mask = is_missing & (gap_lengths > short_gap_threshold)
|
||
if long_gap_mask.any():
|
||
data_reindexed.loc[long_gap_mask, col] = (
|
||
data_reindexed[col].ffill().loc[long_gap_mask]
|
||
)
|
||
|
||
# 重置索引并恢复时间列(保留原格式)
|
||
data_result = data_reindexed.reset_index()
|
||
data_result.rename(columns={"index": time_col}, inplace=True)
|
||
|
||
# 保留时区信息
|
||
data_result[time_col] = data_result[time_col].dt.strftime("%Y-%m-%dT%H:%M:%S%z")
|
||
# 修正时区格式(Python的%z输出为+0000,需转为+00:00)
|
||
data_result[time_col] = data_result[time_col].str.replace(
|
||
r"(\+\d{2})(\d{2})$", r"\1:\2", regex=True
|
||
)
|
||
|
||
return data_result
|
||
|
||
|
||
def _cleanup_temp_files(prefix: str) -> None:
|
||
"""清理 EPANET 仿真产生的临时文件。"""
|
||
for ext in [".inp", ".rpt", ".bin", ".out"]:
|
||
temp_file = prefix + ext
|
||
if os.path.exists(temp_file):
|
||
try:
|
||
os.remove(temp_file)
|
||
except OSError:
|
||
pass
|