From 6b85cfc66640791972b19f6a0916a704550c3627 Mon Sep 17 00:00:00 2001 From: Jiang Date: Mon, 9 Mar 2026 11:30:22 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- QUICK_START_GUIDE.md | 326 ++++++++++ SECONDARY_DEVELOPMENT_GUIDE.md | 1015 ++++++++++++++++++++++++++++++++ 2 files changed, 1341 insertions(+) create mode 100644 QUICK_START_GUIDE.md create mode 100644 SECONDARY_DEVELOPMENT_GUIDE.md diff --git a/QUICK_START_GUIDE.md b/QUICK_START_GUIDE.md new file mode 100644 index 0000000..2cd3592 --- /dev/null +++ b/QUICK_START_GUIDE.md @@ -0,0 +1,326 @@ +# TJWater ServerBinary - Quick Reference for Secondary Development + +## 1️⃣ Adding a New API Endpoint (30 seconds) + +### File: `app/api/v1/endpoints/my_feature.py` +```python +from fastapi import APIRouter, Depends +from app.auth.dependencies import get_current_active_user +from app.domain.schemas.user import UserInDB + +router = APIRouter() + +@router.get("/list") +async def list_items(user: UserInDB = Depends(get_current_active_user)): + return {"items": []} + +@router.post("/create") +async def create_item(data: dict, user: UserInDB = Depends(get_current_active_user)): + return {"status": "created"} +``` + +### File: `app/api/v1/router.py` (Add 2 lines) +```python +from app.api.v1.endpoints import my_feature + +api_router.include_router(my_feature.router, prefix="/items", tags=["Items"]) +``` + +**Done!** Endpoint available at `POST /api/v1/items/create` + +--- + +## 2️⃣ Database Query Pattern + +### Using Repository +```python +from app.infra.repositories.user_repository import UserRepository +from app.auth.dependencies import get_user_repository + +@router.get("/user/{user_id}") +async def get_user(user_id: int, user_repo: UserRepository = Depends(get_user_repository)): + user = await user_repo.get_user_by_id(user_id) + return user +``` + +### Direct Query (if no repository exists) +```python +async with db.get_connection() as conn: + async with conn.cursor() as cur: + await cur.execute("SELECT * FROM users WHERE id = %s", (user_id,)) + row = await cur.fetchone() + return dict(row) if row else None +``` + +--- + +## 3️⃣ Role-Based Access Control + +```python +from app.auth.permissions import require_admin, require_role +from app.domain.models.role import UserRole + +# Admin only +@router.delete("/users/{user_id}") +async def delete_user(user_id: int, admin: UserInDB = Depends(require_admin)): + pass + +# Operator or higher +@router.post("/projects/") +async def create_project(user: UserInDB = Depends(require_role(UserRole.OPERATOR))): + pass + +# Custom role check +def require_viewer(): + return require_role(UserRole.VIEWER) + +@router.get("/reports") +async def get_reports(user: UserInDB = Depends(require_viewer)): + pass +``` + +--- + +## 4️⃣ Call Native (Binary) Code + +```python +from app.native.api import get_all_junctions, add_junction, set_junction + +# Read network elements +junctions = get_all_junctions() + +# Create +add_junction({"id": "J1", "x": 0.0, "y": 0.0, "demand": 100.0}) + +# Update +set_junction("J1", {"demand": 150.0}) + +# Get constants +from app.native.api import PIPE_STATUS_OPEN, OPTION_UNITS_LPS +``` + +--- + +## 5️⃣ Encryption & Security + +### Encrypt Sensitive Data +```python +from app.core.encryption import get_encryptor + +encryptor = get_encryptor() + +encrypted = encryptor.encrypt("secret_password") +decrypted = encryptor.decrypt(encrypted) +``` + +### Verify Password +```python +from app.core.security import verify_password, get_password_hash + +# When creating user +hashed = get_password_hash(user.password) + +# When checking login +if verify_password(form_data.password, stored_hash): + # Correct password + pass +``` + +--- + +## 6️⃣ Configuration + +### In `.env` file +```bash +SECRET_KEY=your-secret-key +ENCRYPTION_KEY=your-fernet-key +DB_HOST=localhost +DB_PORT=5432 +``` + +### Use in Code +```python +from app.core.config import settings + +db_url = settings.SQLALCHEMY_DATABASE_URI +token_minutes = settings.ACCESS_TOKEN_EXPIRE_MINUTES +``` + +--- + +## 7️⃣ Testing + +```python +# tests/unit/test_my_feature.py +import pytest + +def test_my_function(): + result = my_function() + assert result == expected + +# Run tests +pytest tests/ -v +pytest tests/unit/test_my_feature.py::test_my_function -v +``` + +--- + +## 🗺️ Directory Quick Map + +| Path | What | Use For | +|------|------|---------| +| `app/api/v1/endpoints/` | API routes | Add new endpoints | +| `app/auth/` | Auth logic | Login, permissions | +| `app/domain/` | Models/Schemas | Data structures | +| `app/infra/db/` | DB access | Queries, repositories | +| `app/services/` | Business logic | Complex operations | +| `app/native/api/` | Binary functions | Network simulation | +| `app/core/` | Core utilities | Config, security, encryption | +| `tests/` | Tests | Unit & integration tests | + +--- + +## 🔑 Key Imports + +```python +# Authentication +from app.auth.dependencies import get_current_active_user, get_user_repository +from app.auth.permissions import require_admin, require_role + +# Database +from app.infra.db.postgresql.database import db, get_database_instance +from app.infra.repositories.user_repository import UserRepository + +# Schemas & Models +from app.domain.schemas.user import UserInDB, UserCreate, Token +from app.domain.models.role import UserRole + +# Security +from app.core.security import create_access_token, verify_password, get_password_hash +from app.core.encryption import get_encryptor + +# Config +from app.core.config import settings + +# Native API +from app.native.api import get_all_junctions, add_junction, open_project +``` + +--- + +## 🚀 Common Tasks + +### Register New User +```python +from app.infra.repositories.user_repository import UserRepository +from app.domain.schemas.user import UserCreate +from app.domain.models.role import UserRole + +user_data = UserCreate( + username="john", + email="john@example.com", + password="securepass", + role=UserRole.USER +) +new_user = await user_repo.create_user(user_data) +``` + +### Query Database +```python +async with db.get_connection() as conn: + async with conn.cursor() as cur: + await cur.execute("SELECT * FROM users LIMIT 10") + rows = await cur.fetchall() + return [dict(r) for r in rows] +``` + +### Run Simulation +```python +from app.services.simulation import run_simulation + +result = await run_simulation( + project_name="szh", + duration=3600, + time_step=60 +) +``` + +### Audit Operation +```python +from app.core.audit import log_audit_event, AuditAction + +await log_audit_event( + action=AuditAction.CREATE, + resource_type="pipe", + resource_id="P1", + actor_id=user.id, + details={"diameter": 100, "material": "PVC"} +) +``` + +--- + +## ⚠️ Common Gotchas + +1. **Always use `Depends()`** for injecting repositories, users + ```python + # ✅ Right + async def endpoint(user: UserInDB = Depends(get_current_active_user)): + + # ❌ Wrong + async def endpoint(user: UserInDB): + ``` + +2. **Use parametrized queries** to prevent SQL injection + ```python + # ✅ Right + await cur.execute("SELECT * FROM users WHERE id = %s", (user_id,)) + + # ❌ Wrong + await cur.execute(f"SELECT * FROM users WHERE id = {user_id}") + ``` + +3. **Async/await** required for database operations + ```python + # ✅ Right + user = await user_repo.get_user_by_id(1) + + # ❌ Wrong + user = user_repo.get_user_by_id(1) # Missing await + ``` + +4. **Role hierarchy** for permission checks + ```python + # All levels include higher permissions + VIEWER (1) < USER (2) < OPERATOR (3) < ADMIN (4) + ``` + +--- + +## 📞 File Locations Reference + +- **Main app entry**: `app/main.py` (83 lines) +- **Central router**: `app/api/v1/router.py` (98 lines) +- **Config**: `app/core/config.py` (82 lines) +- **Auth dependencies**: `app/auth/dependencies.py` +- **Permissions**: `app/auth/permissions.py` +- **Database**: `app/infra/db/postgresql/database.py` +- **Repositories**: `app/infra/repositories/` +- **Schemas**: `app/domain/schemas/` +- **Models**: `app/domain/models/` + +--- + +## 📚 Full Documentation + +See **SECONDARY_DEVELOPMENT_GUIDE.md** for comprehensive guide on: +- Architecture overview +- Detailed endpoint creation +- Database patterns +- Native module integration +- Testing setup +- Deployment + +--- + +*Generated for TJWater ServerBinary secondary development team* diff --git a/SECONDARY_DEVELOPMENT_GUIDE.md b/SECONDARY_DEVELOPMENT_GUIDE.md new file mode 100644 index 0000000..98a2f9d --- /dev/null +++ b/SECONDARY_DEVELOPMENT_GUIDE.md @@ -0,0 +1,1015 @@ +# TJWater ServerBinary - Secondary Development Documentation Guide + +## Executive Summary + +TJWaterServerBinary is a **FastAPI-based water distribution network management system** with integrated EPANET simulation, SCADA data management, and comprehensive security architecture. This guide identifies key components and patterns essential for secondary development. + +--- + +## 1. Project Structure & Key Directories + +### Overall Architecture +``` +TJWaterServerBinary/ +├── app/ +│ ├── main.py # FastAPI application entry + lifecycle management +│ ├── api/v1/ # API routes and endpoints +│ ├── algorithms/ # Core algorithms (simulation, leakage detection, etc.) +│ ├── auth/ # Authentication logic and dependencies +│ ├── core/ # Core configuration, security, encryption +│ ├── domain/ # Domain models and Pydantic schemas +│ ├── infra/ # Infrastructure layer (DB, cache, audit) +│ ├── services/ # Business logic services +│ ├── native/ # Native module integration (binary/compiled code) +│ └── utils/ # Utility functions +├── infra/docker/ # Docker and deployment configs +├── resources/sql/ # SQL initialization scripts +├── tests/ # Unit and integration tests +└── requirements.txt # Python dependencies +``` + +### Key Directory Details + +| Directory | Purpose | Key Files | +|-----------|---------|-----------| +| **api/v1** | REST API endpoints organized by domain | `router.py` (centralized registration), `endpoints/` (individual controllers) | +| **domain** | Data models and schemas | `models/role.py`, `schemas/user.py`, `schemas/audit.py` | +| **infra** | Database access, caching, audit | `db/postgresql/`, `db/timescaledb/`, `cache/`, `audit/` | +| **auth** | JWT/OAuth2 validation, permissions | `dependencies.py`, `permissions.py`, `keycloak_dependencies.py` | +| **core** | Security, encryption, config | `config.py`, `security.py`, `encryption.py`, `audit.py` | +| **native** | Python-to-binary integration | `api/` module wrapping compiled code | +| **services** | Business logic & orchestration | `tjnetwork.py` (project management), `simulation.py`, etc. | + +--- + +## 2. API Endpoint Addition (Router Registration) + +### How to Add New API Endpoints + +#### Step 1: Create Endpoint File +Create a new file in `app/api/v1/endpoints/` with your endpoint handlers: + +```python +# app/api/v1/endpoints/my_feature.py +from fastapi import APIRouter, Depends, HTTPException, status +from app.auth.dependencies import get_current_active_user +from app.domain.schemas.user import UserInDB +from app.infra.repositories.my_repository import MyRepository +from app.auth.dependencies import get_user_repository + +router = APIRouter() + +@router.get("/my-feature/") +async def get_my_feature(user: UserInDB = Depends(get_current_active_user)): + """Get feature data""" + return {"feature": "data"} + +@router.post("/my-feature/") +async def create_my_feature( + data: dict, + user: UserInDB = Depends(get_current_active_user), +): + """Create new feature""" + return {"status": "created"} +``` + +#### Step 2: Register Router in Central Router +Edit `app/api/v1/router.py` to include your new router: + +```python +# app/api/v1/router.py +from app.api.v1.endpoints import my_feature # Import your endpoint module + +api_router = APIRouter() + +# Add your router +api_router.include_router( + my_feature.router, + prefix="/my-feature", # Optional: URL prefix + tags=["My Feature"] # For OpenAPI documentation +) +``` + +#### Step 3: Authentication & Authorization +- **No Auth Required**: Endpoint is public +- **Authenticated Users**: Use `Depends(get_current_active_user)` +- **Role-Based Access**: Use `Depends(require_admin)` or `Depends(require_role(UserRole.OPERATOR))` + +```python +from app.auth.permissions import require_admin, require_role +from app.domain.models.role import UserRole + +@router.post("/admin-only/") +async def admin_only(user: UserInDB = Depends(require_admin)): + return {"admin": user.username} +``` + +#### Step 4: Response Models +Use Pydantic schemas for validation: + +```python +from pydantic import BaseModel +from typing import Optional + +class MyFeatureResponse(BaseModel): + id: int + name: str + status: Optional[str] = None + +@router.get("/my-feature/{id}", response_model=MyFeatureResponse) +async def get_feature(id: int): + return MyFeatureResponse(id=id, name="Feature", status="active") +``` + +### Router Registration Pattern +The central router (`app/api/v1/router.py`) uses `include_router()` to compose all endpoints: + +```python +# Current structure (98 lines): +api_router.include_router(auth.router, prefix="/auth", tags=["Auth"]) +api_router.include_router(project.router, tags=["Project"]) +api_router.include_router(simulation.router, tags=["Simulation Control"]) +# ... 30+ more routers +``` + +--- + +## 3. Database Interaction Patterns + +### ORM & Database Strategy +**TJWater uses psycopg (PostgreSQL async driver) directly, NOT SQLAlchemy ORM** + +#### Multiple Database Types +- **PostgreSQL**: Main relational data (users, metadata) +- **TimescaleDB**: Time-series data (SCADA, sensor readings) +- **InfluxDB**: Alternative time-series storage (optional) +- **Metadata DB**: Separate instance (`system_hub` database) + +### Database Layer Architecture + +#### 1. Database Instances (`app/infra/db/postgresql/database.py`) + +```python +class Database: + """Manages async connection pooling""" + + def __init__(self, db_name=None): + self.pool = None + self.db_name = db_name + + def init_pool(self, db_name=None): + """Initialize connection pool""" + self.pool = psycopg_pool.AsyncConnectionPool(...) + + async def get_connection(self) -> AsyncGenerator: + """Yield connection from pool""" + async with self.pool.connection() as conn: + yield conn + +# Global instances +db = Database() # Default PostgreSQL +_database_instances = {} # Per-project caches +``` + +#### 2. Repository Pattern (`app/infra/repositories/`) + +```python +class UserRepository: + """Data access layer for users""" + + def __init__(self, db: Database): + self.db = db + + async def create_user(self, user: UserCreate) -> Optional[UserInDB]: + query = """ + INSERT INTO users (username, email, hashed_password, role) + VALUES (%(username)s, %(email)s, %(hashed_password)s, %(role)s) + RETURNING id, username, email, role, created_at + """ + async with self.db.get_connection() as conn: + async with conn.cursor() as cur: + await cur.execute(query, { + 'username': user.username, + 'email': user.email, + 'hashed_password': get_password_hash(user.password), + 'role': user.role.value + }) + row = await cur.fetchone() + return UserInDB(**row) if row else None +``` + +#### 3. Query Execution Pattern + +```python +# Reading data +async with db.get_connection() as conn: + async with conn.cursor() as cur: + await cur.execute("SELECT * FROM users WHERE id = %s", (user_id,)) + row = await cur.fetchone() # Returns dict_row + +# Writing data +async with db.get_connection() as conn: + async with conn.cursor() as cur: + await cur.execute( + "UPDATE users SET email = %s WHERE id = %s", + (new_email, user_id) + ) + # conn auto-commits (if autocommit=True in pool config) +``` + +### Database Configuration + +Via `app/core/config.py` and `.env`: + +```python +# Main PostgreSQL (users, metadata) +DB_HOST = "localhost" +DB_PORT = "5432" +DB_NAME = "tjwater" +DB_USER = "postgres" +DB_PASSWORD = "password" + +# TimescaleDB (time-series) +TIMESCALEDB_DB_HOST = "localhost" +TIMESCALEDB_DB_PORT = "5433" +TIMESCALEDB_DB_NAME = "szh" + +# Metadata DB (system info) +METADATA_DB_NAME = "system_hub" +``` + +### Dynamic Multi-Database Support + +Project-specific databases are managed dynamically: + +```python +# app/infra/db/dynamic_manager.py +async def get_database_instance(db_name: Optional[str] = None) -> Database: + """Get or create DB instance for project""" + if not db_name: + return db # Default instance + + if db_name not in _database_instances: + instance = Database(db_name=db_name) + instance.init_pool() + await instance.open() + _database_instances[db_name] = instance + + return _database_instances[db_name] +``` + +### Migrations & Schema Creation + +- **Location**: `resources/sql/` contains initialization scripts +- **Naming**: Numbered order (e.g., `002_create_audit_logs_table.sql`) +- **Approach**: SQL scripts are manually executed; no Alembic/Flyway framework +- **Example**: `resources/sql/002_create_audit_logs_table.sql` + +--- + +## 4. Native Module Integration + +### Architecture: Python ↔ Binary Code + +TJWater integrates compiled C/Go binaries for water network simulation: + +``` +Python (FastAPI) + ↓ +app/native/api/ + ├── project.py (list, create, delete, open projects) + ├── s2_junctions.py (CRUD for junctions) + ├── s5_pipes.py (CRUD for pipes) + └── [40+ schema modules] (network elements) + ↓ +app/native/api_encap/ (Low-level binary wrappers) + └── api.so / api.dll (Compiled binary library) +``` + +### How Python Calls Native Code + +#### 1. Native API Initialization +```python +# app/native/api/__init__.py +from app.native.api_encap.api import ( + ChangeSet, # Change tracking + API_ADD, API_UPDATE, API_DELETE, # Operations + list_project, create_project, open_project, close_project, + # ... 100+ functions +) +``` + +#### 2. Example: Project Management +```python +# app/services/tjnetwork.py +from app.native.api import list_project, open_project, close_project + +def list_project() -> list[str]: + """List all project databases""" + # Calls into compiled binary to enumerate DB names + return api.list_project() + +def open_project(name: str) -> None: + """Open project and load into memory""" + # Binary loads .inp file or DB data + api.open_project(name) +``` + +#### 3. ChangeSet Tracking +```python +# For tracking network modifications +class ChangeSet: + """Tracks ADD/UPDATE/DELETE operations on network elements""" + def __init__(self): + self.adds = [] + self.updates = [] + self.deletes = [] + +# Usage in services +def update_junction(name: str, demand: float): + change = api.set_junction(name, {"demand": demand}) + # Returns ChangeSet for auditing/undo-redo +``` + +#### 4. Network CRUD Operations +```python +# All operations follow this pattern: +from app.native.api import get_all_junctions, add_junction, set_junction + +# Read +junctions = get_all_junctions() # Returns list of dicts + +# Create +add_junction({"id": "J1", "x": 0.0, "y": 0.0, "demand": 100.0}) + +# Update +set_junction("J1", {"demand": 150.0}) + +# Delete +delete_junction("J1") +``` + +### Data Flow Example: Simulation +``` +FastAPI Request (POST /simulation/run) + ↓ +app/api/v1/endpoints/simulation.py + ↓ +app/services/simulation.py (business logic) + ↓ +app/native/api/ (calls binary functions) + ├── open_project("szh") + ├── set_option("QUALITY", "CHEMICAL") + ├── run_simulation() + └── get_result() + ↓ +TimescaleDB/InfluxDB (store results) + ↓ +Response to client +``` + +### Python Module Constants +Native module exports constants for enumerations: + +```python +from app.native.api import ( + OPTION_UNITS_LPS, # Flow units + OPTION_PRESSURE_KPA, # Pressure units + OPTION_QUALITY_CHEMICAL, # Quality type + PIPE_STATUS_OPEN, # Pipe status + SCADA_DEVICE_TYPE_PRESSURE, +) +``` + +--- + +## 5. Authentication & Authorization Mechanisms + +### Architecture: JWT + OAuth2 + Role-Based Access Control (RBAC) + +### 1. Authentication Flow + +#### User Registration +```python +# app/api/v1/endpoints/auth.py +@router.post("/register", response_model=UserResponse) +async def register(user_data: UserCreate): + # 1. Hash password + hashed = get_password_hash(user_data.password) + + # 2. Store in DB via repository + user = await user_repo.create_user({ + 'username': user_data.username, + 'email': user_data.email, + 'hashed_password': hashed, + 'role': 'USER' + }) + return UserResponse.model_validate(user) +``` + +#### User Login & Token Generation +```python +@router.post("/login", response_model=Token) +async def login(form_data: OAuth2PasswordRequestForm = Depends()): + # 1. Verify credentials + user = await user_repo.get_user_by_username(form_data.username) + if not verify_password(form_data.password, user.hashed_password): + raise HTTPException(status_code=401, detail="Invalid credentials") + + # 2. Generate JWT tokens + access_token = create_access_token(subject=user.username) + refresh_token = create_refresh_token(subject=user.username) + + return Token( + access_token=access_token, + refresh_token=refresh_token, + token_type="bearer", + expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 + ) +``` + +### 2. JWT Token Configuration +```python +# app/core/config.py +SECRET_KEY: str = "your-secret-key" # Change in production +ALGORITHM: str = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 +REFRESH_TOKEN_EXPIRE_DAYS: int = 7 +``` + +### 3. Token Validation & Current User +```python +# app/auth/dependencies.py +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") + +async def get_current_user(token: str = Depends(oauth2_scheme)) -> UserInDB: + """Validate JWT token and extract user""" + try: + payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) + username: str = payload.get("sub") + token_type: str = payload.get("type", "access") + + if username is None or token_type != "access": + raise HTTPException(status_code=401, detail="Invalid token") + + # Fetch user from DB + user = await user_repo.get_user_by_username(username) + if user is None: + raise HTTPException(status_code=401, detail="User not found") + + return user + except JWTError: + raise HTTPException(status_code=401, detail="Invalid token") +``` + +### 4. Role-Based Access Control (RBAC) + +#### User Roles Enum +```python +# app/domain/models/role.py +class UserRole(str, Enum): + ADMIN = "ADMIN" # Full permissions + OPERATOR = "OPERATOR" # Modify data + USER = "USER" # Read/write + VIEWER = "VIEWER" # Read-only + +# Hierarchy +get_hierarchy() = { + VIEWER: 1, + USER: 2, + OPERATOR: 3, + ADMIN: 4, +} +``` + +#### Permission Decorators +```python +# app/auth/permissions.py + +def require_admin(current_user: UserInDB = Depends(get_current_active_user)) -> UserInDB: + """Restrict to admin users""" + if current_user.role != UserRole.ADMIN: + raise HTTPException(status_code=403, detail="Admin access required") + return current_user + +def require_role(role: UserRole): + """Factory for role-based permission checks""" + async def permission_checker( + current_user: UserInDB = Depends(get_current_active_user) + ) -> UserInDB: + if not UserRole(current_user.role).has_permission(role): + raise HTTPException(status_code=403, detail=f"Role {role} required") + return current_user + return permission_checker + +def require_operator(current_user: UserInDB = Depends(get_current_active_user)) -> UserInDB: + return current_user if require_role(UserRole.OPERATOR) else None +``` + +#### Using Permissions in Endpoints +```python +# Require admin +@router.delete("/users/{user_id}") +async def delete_user( + user_id: int, + admin: UserInDB = Depends(require_admin) +): + await user_repo.delete_user(user_id) + return {"status": "deleted"} + +# Require operator or higher +@router.post("/projects/") +async def create_project( + data: dict, + operator: UserInDB = Depends(require_operator) +): + return {"project": "created"} +``` + +### 5. Optional Keycloak Integration +```python +# app/auth/keycloak_dependencies.py +# Alternative: Validate tokens from external Keycloak server + +KEYCLOAK_PUBLIC_KEY: str = "" # From .env +KEYCLOAK_ALGORITHM: str = "RS256" + +# Similar JWT validation but with Keycloak's public key +``` + +### 6. Token Schema +```python +# app/domain/schemas/user.py +class Token(BaseModel): + access_token: str + refresh_token: Optional[str] = None + token_type: str = "bearer" + expires_in: int # Seconds + +class UserInDB(BaseModel): + id: int + username: str + email: str + role: str # "ADMIN", "USER", etc. + is_active: bool + is_superuser: bool +``` + +--- + +## 6. Configuration Management + +### Configuration Hierarchy +``` +Environment Variables (.env file) + ↓ +app/core/config.py (Pydantic Settings) + ↓ +Injected via Depends() in endpoints/services + ↓ +Runtime usage +``` + +### Config File: `app/core/config.py` +Uses Pydantic v2 `BaseSettings` with `.env` support: + +```python +from pydantic_settings import BaseSettings + +class Settings(BaseSettings): + # App + PROJECT_NAME: str = "TJWater Server" + API_V1_STR: str = "/api/v1" + + # Security + SECRET_KEY: str = "your-secret" + ALGORITHM: str = "HS256" + ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 + + # Encryption (Fernet symmetric) + ENCRYPTION_KEY: str = "" # Required from .env + DATABASE_ENCRYPTION_KEY: str = "" + + # PostgreSQL + DB_NAME: str = "tjwater" + DB_HOST: str = "localhost" + DB_PORT: str = "5432" + DB_USER: str = "postgres" + DB_PASSWORD: str = "password" + + # TimescaleDB + TIMESCALEDB_DB_NAME: str = "tjwater" + TIMESCALEDB_DB_HOST: str = "localhost" + TIMESCALEDB_DB_PORT: str = "5433" + + # Metadata DB + METADATA_DB_NAME: str = "system_hub" + METADATA_DB_HOST: str = "localhost" + METADATA_DB_PORT: str = "5432" + + # Cache sizing + PROJECT_PG_CACHE_SIZE: int = 50 + PROJECT_TS_CACHE_SIZE: int = 50 + + # Connection pooling + PROJECT_PG_POOL_SIZE: int = 5 + PROJECT_TS_POOL_MIN_SIZE: int = 1 + PROJECT_TS_POOL_MAX_SIZE: int = 10 + + # InfluxDB (optional) + INFLUXDB_URL: str = "http://localhost:8086" + INFLUXDB_TOKEN: str = "token" + + @property + def SQLALCHEMY_DATABASE_URI(self) -> str: + return f"postgresql://{self.DB_USER}:{self.DB_PASSWORD}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_NAME}" + + model_config = SettingsConfigDict( + env_file=".env", + extra="ignore", + ) + +settings = Settings() +``` + +### Environment File: `.env.example` +```bash +# Security +SECRET_KEY=your-secret-key-change-in-production +ENCRYPTION_KEY= +DATABASE_ENCRYPTION_KEY= + +# PostgreSQL +DB_HOST=localhost +DB_PORT=5432 +DB_NAME=tjwater +DB_USER=postgres +DB_PASSWORD=password + +# TimescaleDB +TIMESCALEDB_DB_NAME=szh +TIMESCALEDB_DB_HOST=localhost +TIMESCALEDB_DB_PORT=5433 +TIMESCALEDB_DB_USER=tjwater +TIMESCALEDB_DB_PASSWORD=Tjwater@123456 + +# Metadata DB +METADATA_DB_NAME=system_hub +METADATA_DB_HOST=localhost + +# Cache/Pool +PROJECT_PG_CACHE_SIZE=50 +PROJECT_TS_CACHE_SIZE=50 +``` + +### Using Configuration in Code +```python +from app.core.config import settings + +# Direct access +db_url = settings.SQLALCHEMY_DATABASE_URI +secret = settings.SECRET_KEY + +# In FastAPI lifespan +@asynccontextmanager +async def lifespan(app: FastAPI): + # Initialize based on settings + db.init_pool(settings.DB_NAME) + yield + await db.close() +``` + +### Encryption Configuration +```python +# app/core/encryption.py +from app.core.config import settings + +class Encryptor: + def __init__(self, key: Optional[bytes] = None): + if key is None: + key_str = settings.ENCRYPTION_KEY + if not key_str: + raise ValueError("ENCRYPTION_KEY not configured") + key = key_str.encode() + self.fernet = Fernet(key) + + def encrypt(self, data: str) -> str: + return self.fernet.encrypt(data.encode()).decode() + + def decrypt(self, data: str) -> str: + return self.fernet.decrypt(data.encode()).decode() + + @staticmethod + def generate_key() -> str: + """Generate new encryption key""" + return Fernet.generate_key().decode() +``` + +--- + +## 7. Testing Setup + +### Test Structure +``` +tests/ +├── conftest.py # Pytest configuration & fixtures +├── api/ # API integration tests +│ ├── test_api_integration.py +│ └── test_leakage_endpoints.py +├── unit/ # Unit tests +│ ├── test_burst_location_service.py +│ ├── test_metadata_repository_dsn_decrypt.py +│ └── test_pipeline_health_analyzer.py +└── auth/ # Auth tests + └── test_encryption.py +``` + +### Configuration: `tests/conftest.py` +```python +import pytest +import sys +import os + +# Add project root to path +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + +def run_this_test(test_file): + """Run a single test file""" + test_name = os.path.splitext(os.path.basename(test_file))[0] + pytest.main([test_file, "-v"]) +``` + +### Testing Tools +```python +# requirements.txt includes: +pytest==8.3.5 # Test runner +# httpx (via FastAPI) for async HTTP client +``` + +### Example: API Integration Test +```python +# tests/api/test_api_integration.py +import pytest + +@pytest.mark.parametrize( + "module_name, desc", + [ + ("app.core.encryption", "Encryption"), + ("app.auth.permissions", "Permissions"), + ("app.api.v1.endpoints.auth", "Auth endpoints"), + ], +) +def test_module_imports(module_name, desc): + """Verify critical modules can be imported""" + try: + __import__(module_name) + except ImportError as e: + pytest.fail(f"Cannot import {desc}: {e}") + +def test_router_configuration(): + """Verify router is properly configured""" + from app.api.v1 import router + api_router = router.api_router + routes = [r.path for r in api_router.routes if hasattr(r, "path")] + assert len(routes) > 0 +``` + +### Example: Encryption Test +```python +# tests/auth/test_encryption.py +from app.core.encryption import Encryptor + +def test_encrypt_decrypt(): + """Test encryption roundtrip""" + encryptor = Encryptor.generate_key() + enc = Encryptor(encryptor) + + plaintext = "sensitive_data" + encrypted = enc.encrypt(plaintext) + decrypted = enc.decrypt(encrypted) + + assert decrypted == plaintext +``` + +### Running Tests +```bash +# Run all tests +pytest tests/ -v + +# Run specific test file +pytest tests/api/test_api_integration.py -v + +# Run specific test +pytest tests/auth/test_encryption.py::test_encrypt_decrypt -v +``` + +--- + +## 8. Audit & Security Features + +### Audit Middleware +Automatically logs all critical operations: + +```python +# app/infra/audit/middleware.py +class AuditMiddleware(BaseHTTPMiddleware): + AUDIT_METHODS = ["POST", "PUT", "DELETE", "PATCH"] + AUDIT_TAGS = ["Audit", "Users", "Project", "Junctions", "Pipes", ...] + + async def dispatch(self, request: Request, call_next): + # 1. Capture request body (for writes) + # 2. Execute request + # 3. Log to audit DB if matches criteria + # 4. Return response +``` + +### Audit Logging +```python +# app/core/audit.py +async def log_audit_event( + action: AuditAction, # "CREATE", "UPDATE", "DELETE", "LOGIN" + resource_type: str, # "user", "project", "pipe" + resource_id: str, + actor_id: int, # User ID + details: dict = None +): + """Log operation to audit table""" + # Persists to audit_logs table via AuditRepository +``` + +### Audit Schema +```python +# app/domain/schemas/audit.py +class AuditLog(BaseModel): + id: int + timestamp: datetime + action: str + resource_type: str + resource_id: str + actor_id: int + details: Optional[dict] = None +``` + +--- + +## 9. Key Dependencies & Tech Stack + +### Core Framework +``` +FastAPI==0.128.0 # Web framework +uvicorn==0.34.0 # ASGI server +Pydantic==2.10.6 # Data validation +pydantic-settings==2.12.0 # Configuration management +``` + +### Database +``` +psycopg==3.2.5 # PostgreSQL async driver +psycopg-pool==3.3.0 # Connection pooling +GeoAlchemy2==0.17.1 # GIS support +SQLAlchemy==2.0.41 # ORM (optional) +``` + +### Security & Auth +``` +PyJWT==2.10.1 # JWT handling +python-jose==3.5.0 # Token validation +passlib==1.7.4 # Password hashing +cryptography==46.0.3 # Encryption (Fernet) +Authlib==1.6.6 # OAuth2 support +``` + +### Data Science / Simulation +``` +wntr==1.3.2 # Water network toolkit (EPANET) +numpy==1.26.2 # Numerical computing +pandas==2.2.3 # Data frames +scipy==1.15.2 # Scientific computing +scikit-learn==1.6.1 # ML algorithms +``` + +### Time-Series & Data Storage +``` +influxdb-client==1.48.0 # InfluxDB client +redis==5.2.1 # Caching & sessions +``` + +### Utilities +``` +python-dotenv==1.2.1 # .env file support +python-multipart==0.0.20 # File upload handling +email-validator==2.3.0 # Email validation +``` + +--- + +## 10. Documentation & Deployment + +### Key Documentation Files +- **SECURITY_README.md**: Complete security implementation guide +- **DEPLOYMENT.md**: Production deployment checklist +- **INTEGRATION_CHECKLIST.md**: System integration steps +- **setup_server.md**: Initial server setup +- **setup_influxdb.md**: InfluxDB configuration + +### Docker Deployment +```yaml +# infra/docker/docker-compose.yml +services: + api: + build: ... + ports: ["8000:8000"] + environment: + - DB_HOST=postgis + - TIMESCALEDB_HOST=timescaledb + - REDIS_HOST=redis + + postgis: + image: postgis/postgis:14-3.5 + + timescaledb: + image: timescale/timescaledb:latest-pg15 + + redis: + image: redis:latest + + influxdb: + image: influxdb:2.7 + + keycloak: + image: keycloak/keycloak:latest + + grafana: + image: grafana/grafana:latest +``` + +--- + +## 11. Recommended Documentation to Create + +### For Secondary Development Team: + +1. **API Development Guide** + - How to add new endpoints (covered in Section 2) + - Common endpoint patterns + - Error handling standards + - Response format conventions + +2. **Database Development Guide** + - Repository pattern usage + - Query building best practices + - Migration procedures + - Multi-database considerations + +3. **Native Module Integration Guide** + - How to add new native functions + - ChangeSet tracking + - Binary library updates + - Debugging native code calls + +4. **Authentication & Authorization** + - Token generation and validation + - Role-based access control implementation + - Permission decorator usage + - Keycloak integration + +5. **Testing & CI/CD** + - Unit test patterns + - Integration test setup + - Test database configuration + - GitHub Actions workflow + +6. **Deployment & DevOps** + - Docker build & deployment + - Kubernetes manifests + - Environment variable management + - Production security checklist + +7. **Troubleshooting Guide** + - Common issues and solutions + - Database connection problems + - Authentication debugging + - Native module errors + +--- + +## Summary Table: Key Components + +| Component | Location | Purpose | Key Technology | +|-----------|----------|---------|-----------------| +| **API Routes** | `app/api/v1/endpoints/` | REST endpoints | FastAPI, Pydantic | +| **Router** | `app/api/v1/router.py` | Central route registration | APIRouter | +| **Auth** | `app/auth/` | JWT/OAuth2 validation | PyJWT, passlib | +| **Database** | `app/infra/db/` | Data persistence | psycopg, async pools | +| **Repository** | `app/infra/repositories/` | Data access layer | psycopg cursor | +| **Domain** | `app/domain/` | Models & schemas | Pydantic, Enum | +| **Services** | `app/services/` | Business logic | Python, native APIs | +| **Native API** | `app/native/api/` | Binary integration | ctypes/cffi, .so/.dll | +| **Config** | `app/core/config.py` | Settings management | Pydantic Settings | +| **Security** | `app/core/security.py` | Encryption, hashing | cryptography, passlib | +| **Audit** | `app/infra/audit/` | Operation logging | Middleware, Repository | +| **Tests** | `tests/` | Test suite | pytest | + +--- + +*This guide provides the essential patterns and structures needed for secondary development on TJWaterServerBinary. Refer to individual source files for detailed implementation examples.*