Files
TJWaterServerBinary/app/api/v1/endpoints/snapshots.py

112 lines
3.8 KiB
Python

from fastapi import APIRouter, Request
from app.native.api import ChangeSet
from app.services.tjnetwork import (
get_current_operation,
execute_undo,
execute_redo,
list_snapshot,
have_snapshot,
have_snapshot_for_operation,
have_snapshot_for_current_operation,
take_snapshot_for_operation,
take_snapshot_for_current_operation,
take_snapshot,
pick_snapshot,
pick_operation,
sync_with_server,
execute_batch_commands,
execute_batch_command,
get_restore_operation,
set_restore_operation,
)
router = APIRouter()
@router.get("/getcurrentoperationid/")
async def get_current_operation_id_endpoint(network: str) -> int:
return get_current_operation(network)
@router.post("/undo/")
async def undo_endpoint(network: str):
return execute_undo(network)
@router.post("/redo/")
async def redo_endpoint(network: str):
return execute_redo(network)
@router.get("/getsnapshots/")
async def list_snapshot_endpoint(network: str) -> list[tuple[int, str]]:
return list_snapshot(network)
@router.get("/havesnapshot/")
async def have_snapshot_endpoint(network: str, tag: str) -> bool:
return have_snapshot(network, tag)
@router.get("/havesnapshotforoperation/")
async def have_snapshot_for_operation_endpoint(network: str, operation: int) -> bool:
return have_snapshot_for_operation(network, operation)
@router.get("/havesnapshotforcurrentoperation/")
async def have_snapshot_for_current_operation_endpoint(network: str) -> bool:
return have_snapshot_for_current_operation(network)
@router.post("/takesnapshotforoperation/")
async def take_snapshot_for_operation_endpoint(
network: str, operation: int, tag: str
) -> None:
return take_snapshot_for_operation(network, operation, tag)
@router.post("/takesnapshotforcurrentoperation")
async def take_snapshot_for_current_operation_endpoint(network: str, tag: str) -> None:
return take_snapshot_for_current_operation(network, tag)
# 兼容旧拼写: takenapshotforcurrentoperation
@router.post("/takenapshotforcurrentoperation")
async def take_snapshot_for_current_operation_legacy_endpoint(
network: str, tag: str
) -> None:
return take_snapshot_for_current_operation(network, tag)
@router.post("/takesnapshot/")
async def take_snapshot_endpoint(network: str, tag: str) -> None:
return take_snapshot(network, tag)
@router.post("/picksnapshot/", response_model=None)
async def pick_snapshot_endpoint(network: str, tag: str, discard: bool = False) -> ChangeSet:
return pick_snapshot(network, tag, discard)
@router.post("/pickoperation/", response_model=None)
async def pick_operation_endpoint(
network: str, operation: int, discard: bool = False
) -> ChangeSet:
return pick_operation(network, operation, discard)
@router.get("/syncwithserver/", response_model=None)
async def sync_with_server_endpoint(network: str, operation: int) -> ChangeSet:
return sync_with_server(network, operation)
@router.post("/batch/", response_model=None)
async def execute_batch_commands_endpoint(network: str, req: Request) -> ChangeSet:
jo_root = await req.json()
cs: ChangeSet = ChangeSet()
cs.operations = jo_root["operations"]
rcs = execute_batch_commands(network, cs)
return rcs
@router.post("/compressedbatch/", response_model=None)
async def execute_compressed_batch_commands_endpoint(
network: str, req: Request
) -> ChangeSet:
jo_root = await req.json()
cs: ChangeSet = ChangeSet()
cs.operations = jo_root["operations"]
return execute_batch_command(network, cs)
@router.get("/getrestoreoperation/")
async def get_restore_operation_endpoint(network: str) -> int:
return get_restore_operation(network)
@router.post("/setrestoreoperation/")
async def set_restore_operation_endpoint(network: str, operation: int) -> None:
return set_restore_operation(network, operation)