Files

89 lines
2.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import pandas as pd
def fill_time_gaps(
data: pd.DataFrame,
time_col: str = "time",
freq: str = "1min",
short_gap_threshold: int = 10,
) -> pd.DataFrame:
"""
补齐缺失时间戳并填补数据缺口。
Args:
data: 包含时间列的 DataFrame
time_col: 时间列名(默认 'time'
freq: 重采样频率(默认 '1min'
short_gap_threshold: 短缺口阈值(分钟),<=此值用线性插值,>此值用前向填充
Returns:
补齐时间后的 DataFrame(保留原时间列格式)
"""
if time_col not in data.columns:
raise ValueError(f"时间列 '{time_col}' 不存在于数据中")
# 解析时间列并设为索引
data = data.copy()
data[time_col] = pd.to_datetime(data[time_col], utc=True)
data_indexed = data.set_index(time_col)
# 生成完整时间范围
full_range = pd.date_range(
start=data_indexed.index.min(), end=data_indexed.index.max(), freq=freq
)
# 重索引以补齐缺失时间点,同时保留原始时间戳
combined_index = data_indexed.index.union(full_range).sort_values().unique()
data_reindexed = data_indexed.reindex(combined_index)
# 按列处理缺口
for col in data_reindexed.columns:
# 识别缺失值位置
is_missing = data_reindexed[col].isna()
# 计算连续缺失的长度
missing_groups = (is_missing != is_missing.shift()).cumsum()
gap_lengths = is_missing.groupby(missing_groups).transform("sum")
# 短缺口:时间插值
short_gap_mask = is_missing & (gap_lengths <= short_gap_threshold)
if short_gap_mask.any():
data_reindexed.loc[short_gap_mask, col] = (
data_reindexed[col]
.interpolate(method="time", limit_area="inside")
.loc[short_gap_mask]
)
# 长缺口:前向填充
long_gap_mask = is_missing & (gap_lengths > short_gap_threshold)
if long_gap_mask.any():
data_reindexed.loc[long_gap_mask, col] = (
data_reindexed[col].ffill().loc[long_gap_mask]
)
# 重置索引并恢复时间列(保留原格式)
data_result = data_reindexed.reset_index()
data_result.rename(columns={"index": time_col}, inplace=True)
# 保留时区信息
data_result[time_col] = data_result[time_col].dt.strftime("%Y-%m-%dT%H:%M:%S%z")
# 修正时区格式(Python的%z输出为+0000,需转为+00:00
data_result[time_col] = data_result[time_col].str.replace(
r"(\+\d{2})(\d{2})$", r"\1:\2", regex=True
)
return data_result
def _cleanup_temp_files(prefix: str) -> None:
"""清理 EPANET 仿真产生的临时文件。"""
for ext in [".inp", ".rpt", ".bin", ".out"]:
temp_file = prefix + ext
if os.path.exists(temp_file):
try:
os.remove(temp_file)
except OSError:
pass