38 lines
1.1 KiB
Python
38 lines
1.1 KiB
Python
from fastapi import APIRouter
|
|
from app.infra.cache.redis_client import redis_client
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post("/clearrediskey/")
|
|
async def fastapi_clear_redis_key(key: str):
|
|
redis_client.delete(key)
|
|
return True
|
|
|
|
|
|
@router.post("/clearrediskeys/")
|
|
async def fastapi_clear_redis_keys(keys: str):
|
|
# delete keys contains the key
|
|
matched_keys = redis_client.keys(f"*{keys}*")
|
|
if matched_keys:
|
|
redis_client.delete(*matched_keys)
|
|
|
|
return True
|
|
|
|
|
|
@router.post("/clearallredis/")
|
|
async def fastapi_clear_all_redis():
|
|
redis_client.flushdb()
|
|
return True
|
|
|
|
|
|
@router.get("/queryredis/")
|
|
async def fastapi_query_redis():
|
|
# Helper to decode bytes to str for JSON response if needed,
|
|
# but original just returned keys (which might be bytes in redis-py unless decode_responses=True)
|
|
# create_redis_client usually sets decode_responses=False by default.
|
|
# We will assume user handles bytes or we should decode.
|
|
# Original just returned redis_client.keys("*")
|
|
keys = redis_client.keys("*")
|
|
# Clean output for API
|
|
return [k.decode('utf-8') if isinstance(k, bytes) else k for k in keys]
|