新增 API 测试用例,修复失效接口问题

This commit is contained in:
2026-05-21 15:32:12 +08:00
parent 751950e5b5
commit 2317f4d527
15 changed files with 1486 additions and 96 deletions
+79
View File
@@ -0,0 +1,79 @@
import asyncio
from datetime import datetime, timezone
from uuid import uuid4
from app.infra.db.metadb.repositories.audit_repository import AuditRepository
from tests.conftest import FakeAsyncSession, FakeExecuteResult, make_audit_log
def test_create_log_adds_commits_and_refreshes(monkeypatch):
class FakeAuditLog:
def __init__(self, **kwargs):
self.id = uuid4()
for key, value in kwargs.items():
setattr(self, key, value)
session = FakeAsyncSession()
repo = AuditRepository(session)
monkeypatch.setattr(
"app.infra.db.metadb.repositories.audit_repository.models.AuditLog",
FakeAuditLog,
)
result = asyncio.run(
repo.create_log(
action="LOGIN",
request_method="POST",
request_path="/auth/login",
response_status=200,
)
)
assert result.action == "LOGIN"
assert result.request_method == "POST"
assert session.commit_count == 1
assert len(session.added) == 1
assert len(session.refreshed) == 1
def test_get_logs_builds_filtered_query_and_returns_models():
log = make_audit_log(action="UPDATE_USER", resource_type="user")
session = FakeAsyncSession(
execute_results=[FakeExecuteResult(rows=[log])],
)
repo = AuditRepository(session)
user_id = uuid4()
project_id = uuid4()
start_time = datetime(2025, 1, 1, tzinfo=timezone.utc)
results = asyncio.run(
repo.get_logs(
user_id=user_id,
project_id=project_id,
action="UPDATE_USER",
resource_type="user",
start_time=start_time,
skip=5,
limit=10,
)
)
assert len(results) == 1
assert results[0].action == "UPDATE_USER"
stmt = session.executed[0]
assert len(stmt._where_criteria) == 5
assert stmt._offset == 5
assert stmt._limit == 10
def test_get_log_count_returns_zero_when_scalar_none():
session = FakeAsyncSession(
execute_results=[FakeExecuteResult(scalar_value=None)],
)
repo = AuditRepository(session)
result = asyncio.run(repo.get_log_count(action="DELETE_USER"))
assert result == 0
stmt = session.executed[0]
assert len(stmt._where_criteria) == 1
+97
View File
@@ -0,0 +1,97 @@
import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock
import pytest
from fastapi import HTTPException
from app.auth import dependencies
from app.core.security import create_access_token, create_refresh_token
from tests.conftest import make_user
def test_get_db_returns_app_state_db():
request = SimpleNamespace(app=SimpleNamespace(state=SimpleNamespace(db="db-instance")))
result = asyncio.run(dependencies.get_db(request))
assert result == "db-instance"
def test_get_db_raises_when_database_missing():
request = SimpleNamespace(app=SimpleNamespace(state=SimpleNamespace()))
with pytest.raises(HTTPException) as exc_info:
asyncio.run(dependencies.get_db(request))
assert exc_info.value.status_code == 503
assert exc_info.value.detail == "Database not initialized"
def test_get_current_user_accepts_valid_access_token():
repo = SimpleNamespace(get_user_by_username=AsyncMock(return_value=make_user()))
result = asyncio.run(
dependencies.get_current_user(
token=create_access_token("tester"),
user_repo=repo,
)
)
assert result.username == "tester"
repo.get_user_by_username.assert_awaited_once_with("tester")
def test_get_current_user_rejects_refresh_token():
repo = SimpleNamespace(get_user_by_username=AsyncMock())
with pytest.raises(HTTPException) as exc_info:
asyncio.run(
dependencies.get_current_user(
token=create_refresh_token("tester"),
user_repo=repo,
)
)
assert exc_info.value.status_code == 401
assert exc_info.value.detail == "Invalid token type. Access token required."
repo.get_user_by_username.assert_not_awaited()
def test_get_current_user_rejects_missing_user():
repo = SimpleNamespace(get_user_by_username=AsyncMock(return_value=None))
with pytest.raises(HTTPException) as exc_info:
asyncio.run(
dependencies.get_current_user(
token=create_access_token("ghost"),
user_repo=repo,
)
)
assert exc_info.value.status_code == 401
assert exc_info.value.detail == "Could not validate credentials"
def test_get_current_active_user_rejects_inactive_user():
with pytest.raises(HTTPException) as exc_info:
asyncio.run(
dependencies.get_current_active_user(
current_user=make_user(is_active=False),
)
)
assert exc_info.value.status_code == 403
assert exc_info.value.detail == "Inactive user"
def test_get_current_superuser_rejects_non_superuser():
with pytest.raises(HTTPException) as exc_info:
asyncio.run(
dependencies.get_current_superuser(
current_user=make_user(is_superuser=False),
)
)
assert exc_info.value.status_code == 403
assert exc_info.value.detail == "Not enough privileges. Superuser access required."
+56
View File
@@ -0,0 +1,56 @@
import asyncio
import pytest
from fastapi import HTTPException
from app.auth import permissions
from app.domain.models.role import UserRole
from tests.conftest import make_user
def test_require_role_allows_higher_privilege_user():
checker = permissions.require_role(UserRole.OPERATOR)
result = asyncio.run(checker(current_user=make_user(role=UserRole.ADMIN)))
assert result.role == UserRole.ADMIN
def test_require_role_rejects_insufficient_role():
checker = permissions.require_role(UserRole.ADMIN)
with pytest.raises(HTTPException) as exc_info:
asyncio.run(checker(current_user=make_user(role=UserRole.USER)))
assert exc_info.value.status_code == 403
assert "Required role: ADMIN" in exc_info.value.detail
def test_check_resource_owner_allows_admin():
assert permissions.check_resource_owner(
99,
make_user(id=1, role=UserRole.ADMIN),
) is True
def test_check_resource_owner_allows_owner():
assert permissions.check_resource_owner(
7,
make_user(id=7, role=UserRole.USER),
) is True
def test_check_resource_owner_rejects_other_user():
assert permissions.check_resource_owner(
7,
make_user(id=8, role=UserRole.USER),
) is False
def test_require_owner_or_admin_rejects_other_user():
checker = permissions.require_owner_or_admin(7)
with pytest.raises(HTTPException) as exc_info:
asyncio.run(checker(current_user=make_user(id=8, role=UserRole.USER)))
assert exc_info.value.status_code == 403
assert exc_info.value.detail == "You don't have permission to access this resource"
+19 -18
View File
@@ -1,9 +1,8 @@
import asyncio
from datetime import datetime, timezone
import importlib.util
from pathlib import Path
import pytest
def _load_scada_repository():
module_path = (
@@ -50,18 +49,19 @@ class _FakeConnection:
return self.cursor_instance
@pytest.mark.asyncio
async def test_update_scada_field_inserts_when_update_hits_no_rows():
def test_update_scada_field_inserts_when_update_hits_no_rows():
ScadaRepository = _load_scada_repository()
conn = _FakeConnection(initial_rowcount=0)
point_time = datetime(2026, 1, 1, 0, 0, tzinfo=timezone.utc)
await ScadaRepository.update_scada_field(
conn,
point_time,
"170490",
"cleaned_value",
26.5,
asyncio.run(
ScadaRepository.update_scada_field(
conn,
point_time,
"170490",
"cleaned_value",
26.5,
)
)
assert len(conn.cursor_instance.calls) == 2
@@ -69,18 +69,19 @@ async def test_update_scada_field_inserts_when_update_hits_no_rows():
assert "INSERT INTO scada.scada_data" in conn.cursor_instance.calls[1][0]
@pytest.mark.asyncio
async def test_update_scada_field_skips_insert_when_update_succeeds():
def test_update_scada_field_skips_insert_when_update_succeeds():
ScadaRepository = _load_scada_repository()
conn = _FakeConnection(initial_rowcount=1)
point_time = datetime(2026, 1, 1, 0, 0, tzinfo=timezone.utc)
await ScadaRepository.update_scada_field(
conn,
point_time,
"170490",
"cleaned_value",
26.5,
asyncio.run(
ScadaRepository.update_scada_field(
conn,
point_time,
"170490",
"cleaned_value",
26.5,
)
)
assert len(conn.cursor_instance.calls) == 1
+124
View File
@@ -0,0 +1,124 @@
import asyncio
from unittest.mock import AsyncMock
import pytest
from app.domain.models.role import UserRole
from app.domain.schemas.user import UserCreate, UserUpdate
from app.infra.db.metadb.repositories.user_repository import UserRepository
from tests.conftest import FakeCursor, FakeDB
def _user_row(**overrides):
base = {
"id": 1,
"username": "tester",
"email": "tester@example.com",
"hashed_password": "hashed-password",
"role": "USER",
"is_active": True,
"is_superuser": False,
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
base.update(overrides)
return base
def test_create_user_hashes_password_and_returns_model(monkeypatch):
cursor = FakeCursor(fetchone_results=[_user_row()])
repo = UserRepository(FakeDB(cursor))
monkeypatch.setattr(
"app.infra.db.metadb.repositories.user_repository.get_password_hash",
lambda password: f"hashed::{password}",
)
result = asyncio.run(
repo.create_user(
UserCreate(
username="tester",
email="tester@example.com",
password="secret123",
)
)
)
assert result is not None
assert result.username == "tester"
assert cursor.executed[0][1]["hashed_password"] == "hashed::secret123"
def test_update_user_without_fields_returns_existing_user(monkeypatch):
repo = UserRepository(FakeDB(FakeCursor()))
existing_user = AsyncMock(return_value="existing")
monkeypatch.setattr(repo, "get_user_by_id", existing_user)
result = asyncio.run(repo.update_user(1, UserUpdate()))
assert result == "existing"
existing_user.assert_awaited_once_with(1)
def test_update_user_builds_dynamic_query(monkeypatch):
cursor = FakeCursor(fetchone_results=[_user_row(role="ADMIN", email="new@example.com")])
repo = UserRepository(FakeDB(cursor))
monkeypatch.setattr(
"app.infra.db.metadb.repositories.user_repository.get_password_hash",
lambda password: f"hashed::{password}",
)
result = asyncio.run(
repo.update_user(
1,
UserUpdate(
email="new@example.com",
password="new-secret",
role=UserRole.ADMIN,
is_active=False,
),
),
)
assert result is not None
query, params = cursor.executed[0]
assert "email = %(email)s" in query
assert "hashed_password = %(hashed_password)s" in query
assert "role = %(role)s" in query
assert "is_active = %(is_active)s" in query
assert params["hashed_password"] == "hashed::new-secret"
assert params["role"] == "ADMIN"
assert params["is_active"] is False
def test_delete_user_returns_false_when_execute_raises():
cursor = FakeCursor()
cursor.execute = AsyncMock(side_effect=RuntimeError("boom"))
repo = UserRepository(FakeDB(cursor))
result = asyncio.run(repo.delete_user(1))
assert result is False
def test_user_exists_short_circuits_without_filters():
cursor = FakeCursor()
repo = UserRepository(FakeDB(cursor))
result = asyncio.run(repo.user_exists())
assert result is False
assert cursor.executed == []
def test_user_exists_checks_username_or_email():
cursor = FakeCursor(fetchone_results=[{"exists": True}])
repo = UserRepository(FakeDB(cursor))
result = asyncio.run(
repo.user_exists(username="tester", email="tester@example.com")
)
assert result is True
query, params = cursor.executed[0]
assert "username = %(username)s OR email = %(email)s" in query
assert params == {"username": "tester", "email": "tester@example.com"}