Compare commits

...

4 Commits

Author SHA1 Message Date
6434cae21c 统一scheme_type命名 2026-02-05 15:39:56 +08:00
a85ff8e215 copilot项目描述文件 2026-02-05 10:47:54 +08:00
2794114000 统一scheme_name命名规则 2026-02-05 10:47:38 +08:00
4c208abe55 优化关阀分析算法,实现网络拓扑缓存,增量图处理 2026-02-05 10:46:46 +08:00
11 changed files with 489 additions and 185 deletions

200
.github/copilot-instructions.md vendored Normal file
View File

@@ -0,0 +1,200 @@
# TJWater Server - Copilot Instructions
This is a FastAPI-based water network management system (供水管网智能管理系统) that provides hydraulic simulation, SCADA data integration, network element management, and risk analysis capabilities.
## Running the Server
```bash
# Install dependencies
pip install -r requirements.txt
# Start the server (default: http://0.0.0.0:8000 with 2 workers)
python scripts/run_server.py
# Note: On Windows, the script automatically sets WindowsSelectorEventLoopPolicy
```
## Running Tests
```bash
# Run all tests
pytest
# Run a specific test file with verbose output
pytest tests/unit/test_pipeline_health_analyzer.py -v
# Run from conftest helper
python tests/conftest.py
```
## Architecture Overview
### Core Components
1. **Native Modules** (`app/native/`): Platform-specific compiled extensions (`.so` for Linux, `.pyd` for Windows) providing performance-critical functionality including:
- SCADA device integration
- Water distribution analysis (WDA)
- Pipe risk probability calculations
- Wrapped through `app.services.tjnetwork` interface
2. **Services Layer** (`app/services/`):
- `tjnetwork.py`: Main network API wrapper around native modules
- `simulation.py`: Hydraulic simulation orchestration (EPANET integration)
- `project_info.py`: Project configuration management
- `epanet/`: EPANET hydraulic engine integration
3. **API Layer** (`app/api/v1/`):
- **Network Elements**: Separate endpoint modules for junctions, reservoirs, tanks, pipes, pumps, valves
- **Components**: Curves, patterns, controls, options, quality, visuals
- **Network Features**: Tags, demands, geometry, regions/DMAs
- **Core Services**: Auth, project, simulation, SCADA, data query, snapshots
4. **Database Infrastructure** (`app/infra/db/`):
- **PostgreSQL**: Primary relational database (users, audit logs, project metadata)
- **TimescaleDB**: Time-series extension for historical data
- **InfluxDB**: Optional time-series database for high-frequency SCADA data
- Connection pools initialized in `main.py` lifespan context
- Database instance stored in `app.state.db` for dependency injection
5. **Domain Layer** (`app/domain/`):
- `models/`: Enums and domain objects (e.g., `UserRole`)
- `schemas/`: Pydantic models for request/response validation
6. **Algorithms** (`app/algorithms/`):
- `api_ex/`: Analysis algorithms (k-means sensor placement, sensitivity analysis, pipeline health)
- `data_cleaning.py`: Data preprocessing utilities
- `simulations.py`: Simulation helpers
### Security & Authentication
- **Authentication**: JWT-based with access tokens (30 min) and refresh tokens (7 days)
- **Authorization**: Role-based access control (RBAC) with 4 roles:
- `VIEWER`: Read-only access
- `USER`: Read-write access
- `OPERATOR`: Modify data
- `ADMIN`: Full permissions
- **Audit Logging**: `AuditMiddleware` automatically logs POST/PUT/DELETE requests
- **Encryption**: Fernet symmetric encryption for sensitive data (`app.core.encryption`)
Default admin accounts:
- `admin` / `admin123`
- `tjwater` / `tjwater@123`
### Key Files
- `app/main.py`: FastAPI app initialization, lifespan (DB pools), CORS, middleware, router mounting
- `app/api/v1/router.py`: Central router aggregating all endpoint modules
- `app/core/config.py`: Settings management using `pydantic-settings`
- `app/auth/dependencies.py`: Auth dependencies (`get_current_active_user`, `get_db`)
- `app/auth/permissions.py`: Permission decorators (`require_role`, `get_current_admin`)
- `configs/project_info.yml`: Default project configuration (auto-loaded on startup)
- `.env`: Environment configuration (database credentials, JWT secret, encryption key)
## Important Conventions
### Database Connections
- Database instances are initialized in `main.py` lifespan and stored in `app.state.db`
- Access via dependency injection:
```python
from app.auth.dependencies import get_db
async def endpoint(db = Depends(get_db)):
# Use db connection
```
### Authentication in Endpoints
Use dependency injection for auth requirements:
```python
from app.auth.dependencies import get_current_active_user
from app.auth.permissions import require_role, get_current_admin
from app.domain.models.role import UserRole
# Require any authenticated user
@router.get("/data")
async def get_data(current_user = Depends(get_current_active_user)):
return data
# Require specific role (USER or higher)
@router.post("/data")
async def create_data(current_user = Depends(require_role(UserRole.USER))):
return result
# Admin-only access
@router.delete("/data/{id}")
async def delete_data(id: int, current_user = Depends(get_current_admin)):
return result
```
### API Routing Structure
- All v1 APIs are mounted under `/api/v1` prefix via `api_router`
- Legacy routes without version prefix are also mounted for backward compatibility
- Group related endpoints in separate router modules under `app/api/v1/endpoints/`
- Use descriptive tags in `router.py` for OpenAPI documentation grouping
### Native Module Integration
- Native modules are pre-compiled for specific platforms
- Always import through `app.native.api` or `app.services.tjnetwork`
- The `tjnetwork` service wraps native APIs with constants like:
- Element types: `JUNCTION`, `RESERVOIR`, `TANK`, `PIPE`, `PUMP`, `VALVE`
- Operations: `API_ADD`, `API_UPDATE`, `API_DELETE`
- `ChangeSet` for batch operations
### Project Initialization
- On startup, `main.py` automatically loads project from `project_info.name` if set
- Projects are opened via `open_project(name)` from `tjnetwork` service
- Initial project config comes from `configs/project_info.yml`
### Audit Logging
Manual audit logging for critical operations:
```python
from app.core.audit import log_audit_event, AuditAction
await log_audit_event(
action=AuditAction.UPDATE,
user_id=current_user.id,
username=current_user.username,
resource_type="resource_name",
resource_id=str(resource_id),
ip_address=request.client.host,
request_data=data
)
```
### Environment Configuration
- Copy `.env.example` to `.env` before first run
- Required environment variables:
- `SECRET_KEY`: JWT signing (generate with `openssl rand -hex 32`)
- `ENCRYPTION_KEY`: Data encryption (generate with Fernet)
- Database credentials for PostgreSQL, TimescaleDB, and optionally InfluxDB
### Database Migrations
SQL migration scripts are in `migrations/`:
- `001_create_users_table.sql`: User authentication tables
- `002_create_audit_logs_table.sql`: Audit logging tables
Apply with:
```bash
psql -U postgres -d tjwater -f migrations/001_create_users_table.sql
```
## API Documentation
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
- OpenAPI schema: http://localhost:8000/openapi.json
## Additional Resources
- `SECURITY_README.md`: Comprehensive security feature documentation
- `DEPLOYMENT.md`: Integration guide for security features
- `readme.md`: Project overview and directory structure (in Chinese)

View File

@@ -199,7 +199,7 @@ def valve_close_analysis(
modify_pattern_start_time: str,
modify_total_duration: int = 900,
modify_valve_opening: dict[str, float] = None,
scheme_Name: str = None,
scheme_name: str = None,
) -> None:
"""
关阀模拟
@@ -207,7 +207,7 @@ def valve_close_analysis(
:param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00'
:param modify_total_duration: 模拟总历时,秒
:param modify_valve_opening: dict中包含多个阀门开启度str为阀门的idfloat为修改后的阀门开启度
:param scheme_Name: 方案名称
:param scheme_name: 方案名称
:return:
"""
print(
@@ -261,8 +261,8 @@ def valve_close_analysis(
modify_pattern_start_time=modify_pattern_start_time,
modify_total_duration=modify_total_duration,
modify_valve_opening=modify_valve_opening,
scheme_Type="valve_close_Analysis",
scheme_Name=scheme_Name,
scheme_type="valve_close_Analysis",
scheme_name=scheme_name,
)
# step 3. restore the base model
# for valve in valves:
@@ -284,7 +284,7 @@ def flushing_analysis(
modify_valve_opening: dict[str, float] = None,
drainage_node_ID: str = None,
flushing_flow: float = 0,
scheme_Name: str = None,
scheme_name: str = None,
) -> None:
"""
管道冲洗模拟
@@ -294,9 +294,15 @@ def flushing_analysis(
:param modify_valve_opening: dict中包含多个阀门开启度str为阀门的idfloat为修改后的阀门开启度
:param drainage_node_ID: 冲洗排放口所在节点ID
:param flushing_flow: 冲洗水量传入参数单位为m3/h
:param scheme_Name: 方案名称
:param scheme_name: 方案名称
:return:
"""
scheme_detail: dict = {
"duration": modify_total_duration,
"valve_opening": modify_valve_opening,
"drainage_node_ID": drainage_node_ID,
"flushing_flow": flushing_flow,
}
print(
datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S")
+ " -- Start Analysis."
@@ -386,14 +392,23 @@ def flushing_analysis(
modify_pattern_start_time=modify_pattern_start_time,
modify_total_duration=modify_total_duration,
modify_valve_opening=modify_valve_opening,
scheme_Type="flushing_Analysis",
scheme_Name=scheme_Name,
scheme_type="flushing_analysis",
scheme_name=scheme_name,
)
# step 4. restore the base model
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
# return result
# 存储方案信息到 PG 数据库
store_scheme_info(
name=name,
scheme_name=scheme_name,
scheme_type="flushing_analysis",
username="admin",
scheme_start_time=modify_pattern_start_time,
scheme_detail=scheme_detail,
)
############################################################
@@ -415,10 +430,10 @@ def contaminant_simulation(
:param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00'
:param modify_total_duration: 模拟总历时,秒
:param source: 污染源所在的节点ID
:param concentration: 污染源位置处的浓度单位mg/L,即默认的污染模拟setting为concentration应改为 Set point booster
:param concentration: 污染源位置处的浓度单位mg/L默认的污染模拟setting为SOURCE_TYPE_CONCEN改为SOURCE_TYPE_SETPOINT
:param source_pattern: 污染源的时间变化模式若不传入则默认以恒定浓度持续模拟时间长度等于duration;
若传入,则格式为{1.0,0.5,1.1}等系数列表pattern_step模拟等于模型的hydraulic time step
:param scheme_Name: 方案名称
:param scheme_name: 方案名称
:return:
"""
scheme_detail: dict = {
@@ -490,7 +505,7 @@ def contaminant_simulation(
cs_source = ChangeSet()
source_schema = {
"node": source,
"s_type": SOURCE_TYPE_CONCEN,
"s_type": SOURCE_TYPE_SETPOINT,
"strength": concentration,
"pattern": pt["id"],
}
@@ -634,7 +649,7 @@ def pressure_regulation(
modify_tank_initial_level: dict[str, float] = None,
modify_fixed_pump_pattern: dict[str, list] = None,
modify_variable_pump_pattern: dict[str, list] = None,
scheme_Name: str = None,
scheme_name: str = None,
) -> None:
"""
区域调压模拟用来模拟未来15分钟内开关水泵对区域压力的影响
@@ -644,7 +659,7 @@ def pressure_regulation(
:param modify_tank_initial_level: dict中包含多个水塔str为水塔的idfloat为修改后的initial_level
:param modify_fixed_pump_pattern: dict中包含多个水泵模式str为工频水泵的idlist为修改后的pattern
:param modify_variable_pump_pattern: dict中包含多个水泵模式str为变频水泵的idlist为修改后的pattern
:param scheme_Name: 模拟方案名称
:param scheme_name: 模拟方案名称
:return:
"""
print(
@@ -696,8 +711,8 @@ def pressure_regulation(
modify_tank_initial_level=modify_tank_initial_level,
modify_fixed_pump_pattern=modify_fixed_pump_pattern,
modify_variable_pump_pattern=modify_variable_pump_pattern,
scheme_Type="pressure_regulation",
scheme_Name=scheme_Name,
scheme_type="pressure_regulation",
scheme_name=scheme_name,
)
if is_project_open(new_name):
close_project(new_name)

View File

@@ -1,10 +1,11 @@
from collections import defaultdict, deque
from functools import lru_cache
import time
from typing import Any
from app.services.tjnetwork import (
get_network_link_nodes,
is_node,
is_link,
get_link_properties,
)
@@ -19,48 +20,102 @@ def _parse_link_entry(link_entry: str) -> tuple[str, str, str, str]:
return parts[0], parts[1], parts[2], parts[3]
@lru_cache(maxsize=16)
def _get_network_topology(network: str):
"""
解析并缓存网络拓扑,大幅减少重复的 API 调用和字符串解析开销。
返回:
- pipe_adj: 永久连通的管道/泵邻接表 (dict[str, set])
- all_valves: 所有阀门字典 {id: (n1, n2)}
- link_lookup: 链路快速查表 {id: (n1, n2, type)} 用于快速定位事故点
- node_set: 所有已知节点集合
"""
pipe_adj = defaultdict(set)
all_valves = {}
link_lookup = {}
node_set = set()
# 此处假设 get_network_link_nodes 获取全网数据
for link_entry in get_network_link_nodes(network):
link_id, link_type, node1, node2 = _parse_link_entry(link_entry)
link_type_name = str(link_type).lower()
link_lookup[link_id] = (node1, node2, link_type_name)
node_set.add(node1)
node_set.add(node2)
if link_type_name == VALVE_LINK_TYPE:
all_valves[link_id] = (node1, node2)
else:
# 只有非阀门(管道/泵)才进入永久连通图
pipe_adj[node1].add(node2)
pipe_adj[node2].add(node1)
return pipe_adj, all_valves, link_lookup, node_set
def valve_isolation_analysis(
network: str, accident_elements: str | list[str]
network: str, accident_elements: str | list[str], disabled_valves: list[str] = None
) -> dict[str, Any]:
"""
关阀搜索/分析:基于拓扑结构确定事故隔离所需关阀。
:param network: 模型名称
:param accident_elements: 事故点(节点或管道/泵/阀门ID可以是单个ID字符串或ID列表
:param disabled_valves: 故障/无法关闭的阀门ID列表
:return: dict包含受影响节点、必须关闭阀门、可选阀门等信息
"""
if disabled_valves is None:
disabled_valves_set = set()
else:
disabled_valves_set = set(disabled_valves)
if isinstance(accident_elements, str):
target_elements = [accident_elements]
else:
target_elements = accident_elements
start_nodes = set()
# 1. 获取缓存拓扑 (极快,无 IO)
pipe_adj, all_valves, link_lookup, node_set = _get_network_topology(network)
# 2. 确定起点,优先查表避免 API 调用
start_nodes = set()
for element in target_elements:
if element in node_set:
start_nodes.add(element)
elif element in link_lookup:
n1, n2, _ = link_lookup[element]
start_nodes.add(n1)
start_nodes.add(n2)
else:
# 仅当缓存中没找到时(极少见),才回退到慢速 API
if is_node(network, element):
start_nodes.add(element)
elif is_link(network, element):
link_props = get_link_properties(network, element)
node1 = link_props.get("node1")
node2 = link_props.get("node2")
if not node1 or not node2:
# 如果是批量处理,可以选择跳过错误或记录错误,这里暂时保持严谨抛出异常
raise ValueError(f"Accident link {element} missing node endpoints")
start_nodes.add(node1)
start_nodes.add(node2)
else:
raise ValueError(f"Accident element {element} not found")
props = get_link_properties(network, element)
n1, n2 = props.get("node1"), props.get("node2")
if n1 and n2:
start_nodes.add(n1)
start_nodes.add(n2)
else:
raise ValueError(
f"Accident element {element} invalid or missing endpoints"
)
adjacency: dict[str, set[str]] = defaultdict(set)
valve_links: dict[str, tuple[str, str]] = {}
for link_entry in get_network_link_nodes(network):
link_id, link_type, node1, node2 = _parse_link_entry(link_entry)
link_type_name = str(link_type).lower()
if link_type_name == VALVE_LINK_TYPE:
valve_links[link_id] = (node1, node2)
continue
adjacency[node1].add(node2)
adjacency[node2].add(node1)
# 3. 处理故障阀门 (构建临时增量图)
# 我们不修改 cached pipe_adj而是建立一个 extra_adj
extra_adj = defaultdict(list)
boundary_valves = {} # 当前有效的边界阀门
for vid, (n1, n2) in all_valves.items():
if vid in disabled_valves_set:
# 故障阀门:视为连通管道
extra_adj[n1].append(n2)
extra_adj[n2].append(n1)
else:
# 正常阀门:视为潜在边界
boundary_valves[vid] = (n1, n2)
# 4. BFS 搜索 (叠加 pipe_adj 和 extra_adj)
affected_nodes: set[str] = set()
queue = deque(start_nodes)
while queue:
@@ -68,18 +123,29 @@ def valve_isolation_analysis(
if node in affected_nodes:
continue
affected_nodes.add(node)
for neighbor in adjacency.get(node, []):
# 遍历永久管道邻居
if node in pipe_adj:
for neighbor in pipe_adj[node]:
if neighbor not in affected_nodes:
queue.append(neighbor)
# 遍历故障阀门带来的额外邻居
if node in extra_adj:
for neighbor in extra_adj[node]:
if neighbor not in affected_nodes:
queue.append(neighbor)
# 5. 结果聚合
must_close_valves: list[str] = []
optional_valves: list[str] = []
for valve_id, (node1, node2) in valve_links.items():
in_node1 = node1 in affected_nodes
in_node2 = node2 in affected_nodes
if in_node1 and in_node2:
for valve_id, (n1, n2) in boundary_valves.items():
in_n1 = n1 in affected_nodes
in_n2 = n2 in affected_nodes
if in_n1 and in_n2:
optional_valves.append(valve_id)
elif in_node1 or in_node2:
elif in_n1 or in_n2:
must_close_valves.append(valve_id)
must_close_valves.sort()
@@ -87,6 +153,7 @@ def valve_isolation_analysis(
result = {
"accident_elements": target_elements,
"disabled_valves": disabled_valves,
"affected_nodes": sorted(affected_nodes),
"must_close_valves": must_close_valves,
"optional_valves": optional_valves,

View File

@@ -316,7 +316,7 @@ async def fastapi_query_all_scheme_all_records(
return loaded_dict
results = influxdb_api.query_scheme_all_record(
scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate
scheme_type=schemetype, scheme_name=schemename, query_date=querydate
)
packed = msgpack.packb(results, default=encode_datetime)
redis_client.set(cache_key, packed)
@@ -334,7 +334,7 @@ async def fastapi_query_all_scheme_all_records_property(
all_results = msgpack.unpackb(data, object_hook=decode_datetime)
else:
all_results = influxdb_api.query_scheme_all_record(
scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate
scheme_type=schemetype, scheme_name=schemename, query_date=querydate
)
packed = msgpack.packb(all_results, default=encode_datetime)
redis_client.set(cache_key, packed)

View File

@@ -22,7 +22,7 @@ async def get_all_extension_data_endpoint(network: str) -> dict[str, Any]:
async def get_extension_data_endpoint(network: str, key: str) -> str | None:
return get_extension_data(network, key)
@router.post("/setextensiondata", response_model=None)
@router.post("/setextensiondata/", response_model=None)
async def set_extension_data_endpoint(network: str, req: Request) -> ChangeSet:
props = await req.json()
print(props)

View File

@@ -60,7 +60,7 @@ class BurstAnalysis(BaseModel):
modify_fixed_pump_pattern: Optional[dict[str, list]] = None
modify_variable_pump_pattern: Optional[dict[str, list]] = None
modify_valve_opening: Optional[dict[str, float]] = None
scheme_Name: Optional[str] = None
scheme_name: Optional[str] = None
class SchedulingAnalysis(BaseModel):
@@ -78,7 +78,7 @@ class PressureRegulation(BaseModel):
pump_control: dict
tank_init_level: Optional[dict] = None
duration: Optional[int] = 900
scheme_Name: Optional[str] = None
scheme_name: Optional[str] = None
class ProjectManagement(BaseModel):
@@ -239,9 +239,29 @@ async def fastapi_valve_close_analysis(
@router.get("/valve_isolation_analysis/")
async def valve_isolation_endpoint(
network: str, accident_element: List[str] = Query(...)
network: str,
accident_element: List[str] = Query(...),
disabled_valves: List[str] = Query(None),
):
return analyze_valve_isolation(network, accident_element)
result = {
"accident_element": "P461309",
"accident_elements": ["P461309"],
"affected_nodes": [
"J316629_A",
"J317037_B",
"J317060_B",
"J408189_B",
"J499996",
"J524940",
"J535933",
"J58841",
],
"isolatable": True,
"must_close_valves": ["210521658", "V12974", "V12986", "V12993"],
"optional_valves": [],
}
result = analyze_valve_isolation(network, accident_element, disabled_valves)
return result
@router.get("/flushinganalysis/")
@@ -342,7 +362,7 @@ async def fastapi_pressure_regulation(data: PressureRegulation) -> str:
modify_tank_initial_level=item["tank_init_level"],
modify_fixed_pump_pattern=fixed_pump_pattern or None,
modify_variable_pump_pattern=variable_pump_pattern or None,
scheme_Name=item["scheme_Name"],
scheme_name=item["scheme_name"],
)
return "success"

View File

@@ -404,8 +404,8 @@ def create_and_initialize_buckets(org_name: str) -> None:
Point("link")
.tag("date", None)
.tag("ID", None)
.tag("scheme_Type", None)
.tag("scheme_Name", None)
.tag("scheme_type", None)
.tag("scheme_name", None)
.field("flow", 0.0)
.field("leakage", 0.0)
.field("velocity", 0.0)
@@ -420,8 +420,8 @@ def create_and_initialize_buckets(org_name: str) -> None:
Point("node")
.tag("date", None)
.tag("ID", None)
.tag("scheme_Type", None)
.tag("scheme_Name", None)
.tag("scheme_type", None)
.tag("scheme_name", None)
.field("head", 0.0)
.field("pressure", 0.0)
.field("actualdemand", 0.0)
@@ -436,8 +436,8 @@ def create_and_initialize_buckets(org_name: str) -> None:
.tag("date", None)
.tag("description", None)
.tag("device_ID", None)
.tag("scheme_Type", None)
.tag("scheme_Name", None)
.tag("scheme_type", None)
.tag("scheme_name", None)
.field("monitored_value", 0.0)
.field("datacleaning_value", 0.0)
.field("scheme_simulation_value", 0.0)
@@ -1811,8 +1811,8 @@ def query_SCADA_data_by_device_ID_and_time(
def query_scheme_SCADA_data_by_device_ID_and_time(
query_ids_list: List[str],
query_time: str,
scheme_Type: str,
scheme_Name: str,
scheme_type: str,
scheme_name: str,
bucket: str = "scheme_simulation_result",
) -> Dict[str, float]:
"""
@@ -1843,7 +1843,7 @@ def query_scheme_SCADA_data_by_device_ID_and_time(
flux_query = f"""
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "monitored_value" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}")
|> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "monitored_value" and r["scheme_type"] == "{scheme_type}" and r["scheme_name"] == "{scheme_name}")
"""
# 执行查询
try:
@@ -2585,7 +2585,7 @@ def query_all_scheme_record_by_time_property(
flux_query = f"""
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["scheme_Name"] == "{scheme_name}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}")
|> filter(fn: (r) => r["scheme_name"] == "{scheme_name}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}")
"""
# 执行查询
tables = query_api.query(flux_query)
@@ -2635,7 +2635,7 @@ def query_scheme_simulation_result_by_ID_time(
flux_query = f"""
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["scheme_Name"] == "{scheme_name}" and r["_measurement"] == "node" and r["ID"] == "{ID}")
|> filter(fn: (r) => r["scheme_name"] == "{scheme_name}" and r["_measurement"] == "node" and r["ID"] == "{ID}")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
@@ -2660,7 +2660,7 @@ def query_scheme_simulation_result_by_ID_time(
flux_query = f"""
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["scheme_Name"] == "{scheme_name}" and r["_measurement"] == "link" and r["ID"] == "{ID}")
|> filter(fn: (r) => r["scheme_name"] == "{scheme_name}" and r["_measurement"] == "link" and r["ID"] == "{ID}")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
@@ -3227,8 +3227,8 @@ def store_scheme_simulation_result_to_influxdb(
link_result_list: List[Dict[str, any]],
scheme_start_time: str,
num_periods: int = 1,
scheme_Type: str = None,
scheme_Name: str = None,
scheme_type: str = None,
scheme_name: str = None,
bucket: str = "scheme_simulation_result",
):
"""
@@ -3237,8 +3237,8 @@ def store_scheme_simulation_result_to_influxdb(
:param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。
:param scheme_start_time: (str): 方案模拟开始时间。
:param num_periods: (int): 方案模拟的周期数
:param scheme_Type: (str): 方案类型
:param scheme_Name: (str): 方案名称
:param scheme_type: (str): 方案类型
:param scheme_name: (str): 方案名称
:param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "scheme_simulation_result"
:return:
"""
@@ -3298,8 +3298,8 @@ def store_scheme_simulation_result_to_influxdb(
Point("node")
.tag("date", date_str)
.tag("ID", node_id)
.tag("scheme_Type", scheme_Type)
.tag("scheme_Name", scheme_Name)
.tag("scheme_type", scheme_type)
.tag("scheme_name", scheme_name)
.field("head", data.get("head", 0.0))
.field("pressure", data.get("pressure", 0.0))
.field("actualdemand", data.get("demand", 0.0))
@@ -3322,8 +3322,8 @@ def store_scheme_simulation_result_to_influxdb(
Point("link")
.tag("date", date_str)
.tag("ID", link_id)
.tag("scheme_Type", scheme_Type)
.tag("scheme_Name", scheme_Name)
.tag("scheme_type", scheme_type)
.tag("scheme_name", scheme_name)
.field("flow", data.get("flow", 0.0))
.field("velocity", data.get("velocity", 0.0))
.field("headloss", data.get("headloss", 0.0))
@@ -3409,14 +3409,14 @@ def query_corresponding_query_id_and_element_id(name: str) -> None:
# 2025/03/11
def fill_scheme_simulation_result_to_SCADA(
scheme_Type: str = None,
scheme_Name: str = None,
scheme_type: str = None,
scheme_name: str = None,
query_date: str = None,
bucket: str = "scheme_simulation_result",
):
"""
:param scheme_Type: 方案类型
:param scheme_Name: 方案名称
:param scheme_type: 方案类型
:param scheme_name: 方案名称
:param query_date: 查询日期,格式为 'YYYY-MM-DD'
:param bucket: InfluxDB 的 bucket 名称,默认值为 "scheme_simulation_result"
:return:
@@ -3457,8 +3457,8 @@ def fill_scheme_simulation_result_to_SCADA(
# 查找associated_element_id的对应值
for key, value in globals.scheme_source_outflow_ids.items():
scheme_source_outflow_result = query_scheme_curve_by_ID_property(
scheme_Type=scheme_Type,
scheme_Name=scheme_Name,
scheme_type=scheme_type,
scheme_name=scheme_name,
query_date=query_date,
ID=value,
type="link",
@@ -3470,8 +3470,8 @@ def fill_scheme_simulation_result_to_SCADA(
Point("scheme_source_outflow")
.tag("date", query_date)
.tag("device_ID", key)
.tag("scheme_Type", scheme_Type)
.tag("scheme_Name", scheme_Name)
.tag("scheme_type", scheme_type)
.tag("scheme_name", scheme_name)
.field("monitored_value", data["value"])
.time(data["time"], write_precision="s")
)
@@ -3480,8 +3480,8 @@ def fill_scheme_simulation_result_to_SCADA(
for key, value in globals.scheme_pipe_flow_ids.items():
scheme_pipe_flow_result = query_scheme_curve_by_ID_property(
scheme_Type=scheme_Type,
scheme_Name=scheme_Name,
scheme_type=scheme_type,
scheme_name=scheme_name,
query_date=query_date,
ID=value,
type="link",
@@ -3492,8 +3492,8 @@ def fill_scheme_simulation_result_to_SCADA(
Point("scheme_pipe_flow")
.tag("date", query_date)
.tag("device_ID", key)
.tag("scheme_Type", scheme_Type)
.tag("scheme_Name", scheme_Name)
.tag("scheme_type", scheme_type)
.tag("scheme_name", scheme_name)
.field("monitored_value", data["value"])
.time(data["time"], write_precision="s")
)
@@ -3502,8 +3502,8 @@ def fill_scheme_simulation_result_to_SCADA(
for key, value in globals.scheme_pressure_ids.items():
scheme_pressure_result = query_scheme_curve_by_ID_property(
scheme_Type=scheme_Type,
scheme_Name=scheme_Name,
scheme_type=scheme_type,
scheme_name=scheme_name,
query_date=query_date,
ID=value,
type="node",
@@ -3514,8 +3514,8 @@ def fill_scheme_simulation_result_to_SCADA(
Point("scheme_pressure")
.tag("date", query_date)
.tag("device_ID", key)
.tag("scheme_Type", scheme_Type)
.tag("scheme_Name", scheme_Name)
.tag("scheme_type", scheme_type)
.tag("scheme_name", scheme_name)
.field("monitored_value", data["value"])
.time(data["time"], write_precision="s")
)
@@ -3524,8 +3524,8 @@ def fill_scheme_simulation_result_to_SCADA(
for key, value in globals.scheme_demand_ids.items():
scheme_demand_result = query_scheme_curve_by_ID_property(
scheme_Type=scheme_Type,
scheme_Name=scheme_Name,
scheme_type=scheme_type,
scheme_name=scheme_name,
query_date=query_date,
ID=value,
type="node",
@@ -3536,8 +3536,8 @@ def fill_scheme_simulation_result_to_SCADA(
Point("scheme_demand")
.tag("date", query_date)
.tag("device_ID", key)
.tag("scheme_Type", scheme_Type)
.tag("scheme_Name", scheme_Name)
.tag("scheme_type", scheme_type)
.tag("scheme_name", scheme_name)
.field("monitored_value", data["value"])
.time(data["time"], write_precision="s")
)
@@ -3546,8 +3546,8 @@ def fill_scheme_simulation_result_to_SCADA(
for key, value in globals.scheme_quality_ids.items():
scheme_quality_result = query_scheme_curve_by_ID_property(
scheme_Type=scheme_Type,
scheme_Name=scheme_Name,
scheme_type=scheme_type,
scheme_name=scheme_name,
query_date=query_date,
ID=value,
type="node",
@@ -3558,8 +3558,8 @@ def fill_scheme_simulation_result_to_SCADA(
Point("scheme_quality")
.tag("date", query_date)
.tag("device_ID", key)
.tag("scheme_Type", scheme_Type)
.tag("scheme_Name", scheme_Name)
.tag("scheme_type", scheme_type)
.tag("scheme_name", scheme_name)
.field("monitored_value", data["value"])
.time(data["time"], write_precision="s")
)
@@ -3629,15 +3629,15 @@ def query_SCADA_data_curve(
# 2025/02/18
def query_scheme_all_record_by_time(
scheme_Type: str,
scheme_Name: str,
scheme_type: str,
scheme_name: str,
query_time: str,
bucket: str = "scheme_simulation_result",
) -> tuple:
"""
查询指定方案某一时刻的所有记录包括node'link分别以指定格式返回。
:param scheme_Type: 方案类型
:param scheme_Name: 方案名称
:param scheme_type: 方案类型
:param scheme_name: 方案名称
:param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'
:param bucket: 数据存储的 bucket 名称。
:return: dict: tuple: (node_records, link_records)
@@ -3660,7 +3660,7 @@ def query_scheme_all_record_by_time(
flux_query = f"""
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["_measurement"] == "node" or r["_measurement"] == "link")
|> filter(fn: (r) => r["scheme_type"] == "{scheme_type}" and r["scheme_name"] == "{scheme_name}" and r["_measurement"] == "node" or r["_measurement"] == "link")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
@@ -3710,8 +3710,8 @@ def query_scheme_all_record_by_time(
# 2025/03/04
def query_scheme_all_record_by_time_property(
scheme_Type: str,
scheme_Name: str,
scheme_type: str,
scheme_name: str,
query_time: str,
type: str,
property: str,
@@ -3719,8 +3719,8 @@ def query_scheme_all_record_by_time_property(
) -> list:
"""
查询指定方案某一时刻node'link某一属性值以指定格式返回。
:param scheme_Type: 方案类型
:param scheme_Name: 方案名称
:param scheme_type: 方案类型
:param scheme_name: 方案名称
:param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'
:param type: 查询的类型(决定 measurement
:param property: 查询的字段名称field
@@ -3752,7 +3752,7 @@ def query_scheme_all_record_by_time_property(
flux_query = f"""
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}")
|> filter(fn: (r) => r["scheme_type"] == "{scheme_type}" and r["scheme_name"] == "{scheme_name}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}")
"""
# 执行查询
tables = query_api.query(flux_query)
@@ -3767,8 +3767,8 @@ def query_scheme_all_record_by_time_property(
# 2025/02/19
def query_scheme_curve_by_ID_property(
scheme_Type: str,
scheme_Name: str,
scheme_type: str,
scheme_name: str,
query_date: str,
ID: str,
type: str,
@@ -3776,9 +3776,9 @@ def query_scheme_curve_by_ID_property(
bucket: str = "scheme_simulation_result",
) -> list:
"""
根据scheme_Type和scheme_Name,查询该模拟方案中某一node或link的某一属性值的所有时间的结果
:param scheme_Type: 方案类型
:param scheme_Name: 方案名称
根据scheme_Type和scheme_name,查询该模拟方案中某一node或link的某一属性值的所有时间的结果
:param scheme_type: 方案类型
:param scheme_name: 方案名称
:param query_date: 查询日期,格式为 'YYYY-MM-DD'
:param ID: 元素的ID
:param type: 元素的类型node或link
@@ -3817,7 +3817,7 @@ def query_scheme_curve_by_ID_property(
flux_query = f"""
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["ID"] == "{ID}" and r["_field"] == "{property}")
|> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["scheme_type"] == "{scheme_type}" and r["scheme_name"] == "{scheme_name}" and r["ID"] == "{ID}" and r["_field"] == "{property}")
"""
# 执行查询
tables = query_api.query(flux_query)
@@ -3832,15 +3832,15 @@ def query_scheme_curve_by_ID_property(
# 2025/02/21
def query_scheme_all_record(
scheme_Type: str,
scheme_Name: str,
scheme_type: str,
scheme_name: str,
query_date: str,
bucket: str = "scheme_simulation_result",
) -> tuple:
"""
查询指定方案的所有记录包括node'link分别以指定格式返回。
:param scheme_Type: 方案类型
:param scheme_Name: 方案名称
:param scheme_type: 方案类型
:param scheme_name: 方案名称
:param query_date: 查询日期,格式为 'YYYY-MM-DD'
:param bucket: 数据存储的 bucket 名称。
:return: dict: tuple: (node_records, link_records)
@@ -3867,7 +3867,7 @@ def query_scheme_all_record(
flux_query = f"""
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["_measurement"] == "node" or r["_measurement"] == "link")
|> filter(fn: (r) => r["scheme_type"] == "{scheme_type}" and r["scheme_name"] == "{scheme_name}" and r["_measurement"] == "node" or r["_measurement"] == "link")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
@@ -3917,8 +3917,8 @@ def query_scheme_all_record(
# 2025/03/04
def query_scheme_all_record_property(
scheme_Type: str,
scheme_Name: str,
scheme_type: str,
scheme_name: str,
query_date: str,
type: str,
property: str,
@@ -3926,8 +3926,8 @@ def query_scheme_all_record_property(
) -> list:
"""
查询指定方案的node'link的某一属性值以指定格式返回。
:param scheme_Type: 方案类型
:param scheme_Name: 方案名称
:param scheme_type: 方案类型
:param scheme_name: 方案名称
:param query_date: 查询日期,格式为 'YYYY-MM-DD'
:param type: 查询的类型(决定 measurement
:param property: 查询的字段名称field
@@ -3964,7 +3964,7 @@ def query_scheme_all_record_property(
flux_query = f"""
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["date"] == "{query_date}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}")
|> filter(fn: (r) => r["scheme_type"] == "{scheme_type}" and r["scheme_name"] == "{scheme_name}" and r["date"] == "{query_date}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}")
"""
# 执行查询
tables = query_api.query(flux_query)
@@ -4245,8 +4245,8 @@ def export_scheme_simulation_result_to_csv_time(
link_data[key][field] = record.get_value()
link_data[key]["measurement"] = record.get_measurement()
link_data[key]["date"] = record.values.get("date", None)
link_data[key]["scheme_Type"] = record.values.get("scheme_Type", None)
link_data[key]["scheme_Name"] = record.values.get("scheme_Name", None)
link_data[key]["scheme_type"] = record.values.get("scheme_type", None)
link_data[key]["scheme_name"] = record.values.get("scheme_name", None)
# 构建 Flux 查询语句,查询指定时间范围内的数据
flux_query_node = f"""
from(bucket: "{bucket}")
@@ -4267,8 +4267,8 @@ def export_scheme_simulation_result_to_csv_time(
node_data[key][field] = record.get_value()
node_data[key]["measurement"] = record.get_measurement()
node_data[key]["date"] = record.values.get("date", None)
node_data[key]["scheme_Type"] = record.values.get("scheme_Type", None)
node_data[key]["scheme_Name"] = record.values.get("scheme_Name", None)
node_data[key]["scheme_type"] = record.values.get("scheme_type", None)
node_data[key]["scheme_name"] = record.values.get("scheme_name", None)
for key in set(link_data.keys()):
row = {"time": key[0], "ID": key[1]}
row.update(link_data.get(key, {}))
@@ -4288,8 +4288,8 @@ def export_scheme_simulation_result_to_csv_time(
"time",
"measurement",
"date",
"scheme_Type",
"scheme_Name",
"scheme_type",
"scheme_name",
"ID",
"flow",
"leakage",
@@ -4311,8 +4311,8 @@ def export_scheme_simulation_result_to_csv_time(
"time",
"measurement",
"date",
"scheme_Type",
"scheme_Name",
"scheme_type",
"scheme_name",
"ID",
"head",
"pressure",
@@ -4330,15 +4330,15 @@ def export_scheme_simulation_result_to_csv_time(
# 2025/02/18
def export_scheme_simulation_result_to_csv_scheme(
scheme_Type: str,
scheme_Name: str,
scheme_type: str,
scheme_name: str,
query_date: str,
bucket: str = "scheme_simulation_result",
) -> None:
"""
导出influxdb中scheme_simulation_result这个bucket的数据到csv中
:param scheme_Type: 查询的方案类型
:param scheme_Name: 查询的方案名
:param scheme_type: 查询的方案类型
:param scheme_name: 查询的方案名
:param query_date: 查询日期,格式为 'YYYY-MM-DD'
:param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data"
:return:
@@ -4366,7 +4366,7 @@ def export_scheme_simulation_result_to_csv_scheme(
flux_query_link = f"""
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r["_measurement"] == "link" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}")
|> filter(fn: (r) => r["_measurement"] == "link" and r["scheme_type"] == "{scheme_type}" and r["scheme_name"] == "{scheme_name}")
"""
# 执行查询
link_tables = query_api.query(flux_query_link)
@@ -4382,13 +4382,13 @@ def export_scheme_simulation_result_to_csv_scheme(
link_data[key][field] = record.get_value()
link_data[key]["measurement"] = record.get_measurement()
link_data[key]["date"] = record.values.get("date", None)
link_data[key]["scheme_Type"] = record.values.get("scheme_Type", None)
link_data[key]["scheme_Name"] = record.values.get("scheme_Name", None)
link_data[key]["scheme_type"] = record.values.get("scheme_type", None)
link_data[key]["scheme_name"] = record.values.get("scheme_name", None)
# 构建 Flux 查询语句,查询指定时间范围内的数据
flux_query_node = f"""
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r["_measurement"] == "node" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}")
|> filter(fn: (r) => r["_measurement"] == "node" and r["scheme_type"] == "{scheme_type}" and r["scheme_name"] == "{scheme_name}")
"""
# 执行查询
node_tables = query_api.query(flux_query_node)
@@ -4404,8 +4404,8 @@ def export_scheme_simulation_result_to_csv_scheme(
node_data[key][field] = record.get_value()
node_data[key]["measurement"] = record.get_measurement()
node_data[key]["date"] = record.values.get("date", None)
node_data[key]["scheme_Type"] = record.values.get("scheme_Type", None)
node_data[key]["scheme_Name"] = record.values.get("scheme_Name", None)
node_data[key]["scheme_type"] = record.values.get("scheme_type", None)
node_data[key]["scheme_name"] = record.values.get("scheme_name", None)
for key in set(link_data.keys()):
row = {"time": key[0], "ID": key[1]}
row.update(link_data.get(key, {}))
@@ -4416,10 +4416,10 @@ def export_scheme_simulation_result_to_csv_scheme(
node_rows.append(row)
# 动态生成 CSV 文件名
csv_filename_link = (
f"scheme_simulation_link_result_{scheme_Name}_of_{scheme_Type}.csv"
f"scheme_simulation_link_result_{scheme_name}_of_{scheme_type}.csv"
)
csv_filename_node = (
f"scheme_simulation_node_result_{scheme_Name}_of_{scheme_Type}.csv"
f"scheme_simulation_node_result_{scheme_name}_of_{scheme_type}.csv"
)
# 写入到 CSV 文件
with open(csv_filename_link, mode="w", newline="") as file:
@@ -4429,8 +4429,8 @@ def export_scheme_simulation_result_to_csv_scheme(
"time",
"measurement",
"date",
"scheme_Type",
"scheme_Name",
"scheme_type",
"scheme_name",
"ID",
"flow",
"leakage",
@@ -4452,8 +4452,8 @@ def export_scheme_simulation_result_to_csv_scheme(
"time",
"measurement",
"date",
"scheme_Type",
"scheme_Name",
"scheme_type",
"scheme_name",
"ID",
"head",
"pressure",
@@ -4878,15 +4878,15 @@ if __name__ == "__main__":
# export_scheme_simulation_result_to_csv_time(start_date='2025-02-13', end_date='2025-02-15')
# 示例9export_scheme_simulation_result_to_csv_scheme
# export_scheme_simulation_result_to_csv_scheme(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_date='2025-03-10')
# export_scheme_simulation_result_to_csv_scheme(scheme_type='burst_Analysis', scheme_name='scheme1', query_date='2025-03-10')
# 示例10query_scheme_all_record_by_time
# node_records, link_records = query_scheme_all_record_by_time(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_time="2025-02-14T10:30:00+08:00")
# node_records, link_records = query_scheme_all_record_by_time(scheme_type='burst_Analysis', scheme_name='scheme1', query_time="2025-02-14T10:30:00+08:00")
# print("Node 数据:", node_records)
# print("Link 数据:", link_records)
# 示例11query_scheme_curve_by_ID_property
# curve_result = query_scheme_curve_by_ID_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', ID='ZBBDTZDP000022',
# curve_result = query_scheme_curve_by_ID_property(scheme_type='burst_Analysis', scheme_name='scheme1', ID='ZBBDTZDP000022',
# type='node', property='head')
# print(curve_result)
@@ -4896,7 +4896,7 @@ if __name__ == "__main__":
# print("Link 数据:", link_records)
# 示例13query_scheme_all_record
# node_records, link_records = query_scheme_all_record(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_date='2025-03-10')
# node_records, link_records = query_scheme_all_record(scheme_type='burst_Analysis', scheme_name='scheme1', query_date='2025-03-10')
# print("Node 数据:", node_records)
# print("Link 数据:", link_records)
@@ -4909,16 +4909,16 @@ if __name__ == "__main__":
# print(result_records)
# 示例16query_scheme_all_record_by_time_property
# result_records = query_scheme_all_record_by_time_property(scheme_Type='burst_Analysis', scheme_Name='scheme1',
# result_records = query_scheme_all_record_by_time_property(scheme_type='burst_Analysis', scheme_name='scheme1',
# query_time='2025-02-14T10:30:00+08:00', type='node', property='head')
# print(result_records)
# 示例17query_scheme_all_record_property
# result_records = query_scheme_all_record_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_date='2025-03-10', type='node', property='head')
# result_records = query_scheme_all_record_property(scheme_type='burst_Analysis', scheme_name='scheme1', query_date='2025-03-10', type='node', property='head')
# print(result_records)
# 示例18fill_scheme_simulation_result_to_SCADA
# fill_scheme_simulation_result_to_SCADA(scheme_Type='burst_Analysis', scheme_Name='burst0330', query_date='2025-03-30')
# fill_scheme_simulation_result_to_SCADA(scheme_type='burst_Analysis', scheme_name='burst0330', query_date='2025-03-30')
# 示例19query_SCADA_data_by_device_ID_and_timerange
# result = query_SCADA_data_by_device_ID_and_timerange(query_ids_list=globals.pressure_non_realtime_ids, start_time='2025-04-16T00:00:00+08:00',
@@ -4926,7 +4926,7 @@ if __name__ == "__main__":
# print(result)
# 示例manually_get_burst_flow
# leakage = manually_get_burst_flow(scheme_Type='burst_Analysis', scheme_Name='burst_scheme', scheme_start_time='2025-03-10T12:00:00+08:00')
# leakage = manually_get_burst_flow(scheme_type='burst_Analysis', scheme_name='burst_scheme', scheme_start_time='2025-03-10T12:00:00+08:00')
# print(leakage)
# 示例upload_cleaned_SCADA_data_to_influxdb

View File

@@ -1249,7 +1249,7 @@ def run_simulation(
endtime = time.time()
logging.info("store time: %f", endtime - starttime)
# 暂不需要再次存储 SCADA 模拟信息
# TimescaleInternalQueries.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name)
# TimescaleInternalQueries.fill_scheme_simulation_result_to_SCADA(scheme_type=scheme_type, scheme_name=scheme_name)
# if simulation_type.upper() == "REALTIME":
# influxdb_api.store_realtime_simulation_result_to_influxdb(
@@ -1261,11 +1261,11 @@ def run_simulation(
# link_result,
# modify_pattern_start_time,
# num_periods_result,
# scheme_Type,
# scheme_Name,
# scheme_type,
# scheme_name,
# )
# 暂不需要再次存储 SCADA 模拟信息
# influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name)
# influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_type=scheme_type, scheme_name=scheme_name)
print("after store result")
@@ -1345,7 +1345,7 @@ if __name__ == "__main__":
# run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time='2025-02-25T23:45:00+08:00')
# 模拟示例2
# run_simulation(name='bb', simulation_type="extended", modify_pattern_start_time='2025-03-10T12:00:00+08:00',
# modify_total_duration=1800, scheme_Type="burst_Analysis", scheme_Name="scheme1")
# modify_total_duration=1800, scheme_type="burst_Analysis", scheme_name="scheme1")
# 查询示例1query_SCADA_ID_corresponding_info
# result = query_SCADA_ID_corresponding_info(name='bb', SCADA_ID='P10755')

View File

@@ -4,6 +4,8 @@ from app.algorithms.valve_isolation import valve_isolation_analysis
def analyze_valve_isolation(
network: str, accident_element: str | list[str]
network: str,
accident_element: str | list[str],
disabled_valves: list[str] = None,
) -> dict[str, Any]:
return valve_isolation_analysis(network, accident_element)
return valve_isolation_analysis(network, accident_element, disabled_valves)

View File

@@ -3233,7 +3233,7 @@ async def fastapi_query_all_scheme_all_records(
return loaded_dict
results = influxdb_api.query_scheme_all_record(
scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate
scheme_type=schemetype, scheme_name=schemename, query_date=querydate
)
packed = msgpack.packb(results, default=encode_datetime)
redis_client.set(cache_key, packed)
@@ -3257,7 +3257,7 @@ async def fastapi_query_all_scheme_all_records_property(
all_results = msgpack.unpackb(data, object_hook=decode_datetime)
else:
all_results = influxdb_api.query_scheme_all_record(
scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate
scheme_type=schemetype, scheme_name=schemename, query_date=querydate
)
packed = msgpack.packb(all_results, default=encode_datetime)
redis_client.set(cache_key, packed)
@@ -3585,7 +3585,7 @@ class BurstAnalysis(BaseModel):
modify_fixed_pump_pattern: Optional[dict[str, list]] = None
modify_variable_pump_pattern: Optional[dict[str, list]] = None
modify_valve_opening: Optional[dict[str, float]] = None
scheme_Name: Optional[str] = None
scheme_name: Optional[str] = None
@app.post("/burst_analysis/")
@@ -3608,7 +3608,7 @@ async def fastapi_burst_analysis(data: BurstAnalysis) -> str:
modify_fixed_pump_pattern=item["modify_fixed_pump_pattern"],
modify_variable_pump_pattern=item["modify_variable_pump_pattern"],
modify_valve_opening=item["modify_valve_opening"],
scheme_Name=item["scheme_Name"],
scheme_name=item["scheme_name"],
)
# os.rename(filename2, filename)
@@ -3616,7 +3616,7 @@ async def fastapi_burst_analysis(data: BurstAnalysis) -> str:
# 将 时间转换成日期,然后缓存这个计算结果
# 缓存key: burst_analysis_<name>_<modify_pattern_start_time>
global redis_client
schemename = data.scheme_Name
schemename = data.scheme_name
print(data.modify_pattern_start_time)
@@ -3627,7 +3627,7 @@ async def fastapi_burst_analysis(data: BurstAnalysis) -> str:
cache_key = f"queryallschemeallrecords_burst_Analysis_{schemename}_{querydate}"
data = redis_client.get(cache_key)
if not data:
results = influxdb_api.query_scheme_all_record("burst_Analysis", scheme_Name=schemename, query_date=querydate)
results = influxdb_api.query_scheme_all_record("burst_Analysis", scheme_name=schemename, query_date=querydate)
packed = msgpack.packb(results, default=encode_datetime)
redis_client.set(cache_key, packed)
"""
@@ -3712,7 +3712,7 @@ async def fastapi_contaminant_simulation(
concentration: float,
duration: int,
pattern: str = None,
scheme_Name: str = None,
scheme_name: str = None,
) -> str:
filename = "c:/lock.simulation"
filename2 = "c:/lock.simulation2"

View File

@@ -68,7 +68,7 @@ def burst_analysis(
:param modify_fixed_pump_pattern: dict中包含多个水泵模式str为工频水泵的idlist为修改后的pattern
:param modify_variable_pump_pattern: dict中包含多个水泵模式str为变频水泵的idlist为修改后的pattern
:param modify_valve_opening: dict中包含多个阀门开启度str为阀门的idfloat为修改后的阀门开启度
:param scheme_Name: 方案名称
:param scheme_name: 方案名称
:return:
"""
scheme_detail: dict = {
@@ -294,7 +294,7 @@ def flushing_analysis(
modify_valve_opening: dict[str, float] = None,
drainage_node_ID: str = None,
flushing_flow: float = 0,
scheme_Name: str = None,
scheme_name: str = None,
) -> None:
"""
管道冲洗模拟
@@ -304,7 +304,7 @@ def flushing_analysis(
:param modify_valve_opening: dict中包含多个阀门开启度str为阀门的idfloat为修改后的阀门开启度
:param drainage_node_ID: 冲洗排放口所在节点ID
:param flushing_flow: 冲洗水量传入参数单位为m3/h
:param scheme_Name: 方案名称
:param scheme_name: 方案名称
:return:
"""
print(
@@ -396,8 +396,8 @@ def flushing_analysis(
modify_pattern_start_time=modify_pattern_start_time,
modify_total_duration=modify_total_duration,
modify_valve_opening=modify_valve_opening,
scheme_Type="flushing_Analysis",
scheme_Name=scheme_Name,
scheme_type="flushing_Analysis",
scheme_name=scheme_name,
)
# step 4. restore the base model
if is_project_open(new_name):
@@ -417,7 +417,7 @@ def contaminant_simulation(
source: str = None,# 污染源节点ID
concentration: float = None, # 污染源浓度单位mg/L
source_pattern: str = None, # 污染源时间变化模式名称
scheme_Name: str = None,
scheme_name: str = None,
) -> None:
"""
污染模拟
@@ -428,7 +428,7 @@ def contaminant_simulation(
:param concentration: 污染源位置处的浓度单位mg/L即默认的污染模拟setting为concentration应改为 Set point booster
:param source_pattern: 污染源的时间变化模式若不传入则默认以恒定浓度持续模拟时间长度等于duration;
若传入,则格式为{1.0,0.5,1.1}等系数列表pattern_step模拟等于模型的hydraulic time step
:param scheme_Name: 方案名称
:param scheme_name: 方案名称
:return:
"""
print(
@@ -533,8 +533,8 @@ def contaminant_simulation(
simulation_type="extended",
modify_pattern_start_time=modify_pattern_start_time,
modify_total_duration=modify_total_duration,
scheme_Type="contaminant_Analysis",
scheme_Name=scheme_Name,
scheme_type="contaminant_Analysis",
scheme_name=scheme_name,
)
# for i in range(1,operation_step):
@@ -630,7 +630,7 @@ def pressure_regulation(
modify_tank_initial_level: dict[str, float] = None,
modify_fixed_pump_pattern: dict[str, list] = None,
modify_variable_pump_pattern: dict[str, list] = None,
scheme_Name: str = None,
scheme_name: str = None,
) -> None:
"""
区域调压模拟用来模拟未来15分钟内开关水泵对区域压力的影响
@@ -640,7 +640,7 @@ def pressure_regulation(
:param modify_tank_initial_level: dict中包含多个水塔str为水塔的idfloat为修改后的initial_level
:param modify_fixed_pump_pattern: dict中包含多个水泵模式str为工频水泵的idlist为修改后的pattern
:param modify_variable_pump_pattern: dict中包含多个水泵模式str为变频水泵的idlist为修改后的pattern
:param scheme_Name: 模拟方案名称
:param scheme_name: 模拟方案名称
:return:
"""
print(
@@ -692,8 +692,8 @@ def pressure_regulation(
modify_tank_initial_level=modify_tank_initial_level,
modify_fixed_pump_pattern=modify_fixed_pump_pattern,
modify_variable_pump_pattern=modify_variable_pump_pattern,
scheme_Type="pressure_regulation",
scheme_Name=scheme_Name,
scheme_type="pressure_regulation",
scheme_name=scheme_name,
)
if is_project_open(new_name):
close_project(new_name)
@@ -1536,7 +1536,7 @@ if __name__ == "__main__":
# 示例1burst_analysis
# burst_analysis(name='bb', modify_pattern_start_time='2025-04-17T00:00:00+08:00',
# burst_ID='GSD230112144241FA18292A84CB', burst_size=400, modify_total_duration=1800, scheme_Name='GSD230112144241FA18292A84CB_400')
# burst_ID='GSD230112144241FA18292A84CB', burst_size=400, modify_total_duration=1800, scheme_name='GSD230112144241FA18292A84CB_400')
# 示例create_user
# create_user(name=project_info.name, username='tjwater dev', password='123456')