diff --git a/app/infra/db/dynamic_manager.py b/app/infra/db/dynamic_manager.py index 094cbec..e444a6e 100644 --- a/app/infra/db/dynamic_manager.py +++ b/app/infra/db/dynamic_manager.py @@ -24,6 +24,17 @@ logger = logging.getLogger(__name__) class PgEngineEntry: engine: AsyncEngine sessionmaker: async_sessionmaker[AsyncSession] + connection_url: str + pool_min_size: int + pool_max_size: int + + +@dataclass(frozen=True) +class PoolEntry: + pool: AsyncConnectionPool + connection_url: str + pool_min_size: int + pool_max_size: int @dataclass(frozen=True) @@ -35,8 +46,8 @@ class CacheKey: class ProjectConnectionManager: def __init__(self) -> None: self._pg_cache: Dict[CacheKey, PgEngineEntry] = OrderedDict() - self._ts_cache: Dict[CacheKey, AsyncConnectionPool] = OrderedDict() - self._pg_raw_cache: Dict[CacheKey, AsyncConnectionPool] = OrderedDict() + self._ts_cache: Dict[CacheKey, PoolEntry] = OrderedDict() + self._pg_raw_cache: Dict[CacheKey, PoolEntry] = OrderedDict() self._pg_lock = asyncio.Lock() self._ts_lock = asyncio.Lock() self._pg_raw_lock = asyncio.Lock() @@ -56,15 +67,29 @@ class ProjectConnectionManager: pool_max_size: int, ) -> async_sessionmaker[AsyncSession]: async with self._pg_lock: - key = CacheKey(project_id=project_id, db_role=db_role) - entry = self._pg_cache.get(key) - if entry: - self._pg_cache.move_to_end(key) - return entry.sessionmaker - normalized_url = self._normalize_pg_url(connection_url) pool_min_size = max(1, pool_min_size) pool_max_size = max(pool_min_size, pool_max_size) + + key = CacheKey(project_id=project_id, db_role=db_role) + entry = self._pg_cache.get(key) + if entry: + if ( + entry.connection_url == normalized_url + and entry.pool_min_size == pool_min_size + and entry.pool_max_size == pool_max_size + ): + self._pg_cache.move_to_end(key) + return entry.sessionmaker + + await entry.engine.dispose() + logger.info( + "Rebuilding PostgreSQL engine for project %s (%s) due to config change", + project_id, + db_role, + ) + self._pg_cache.pop(key, None) + engine = create_async_engine( normalized_url, pool_size=pool_min_size, @@ -75,6 +100,9 @@ class ProjectConnectionManager: self._pg_cache[key] = PgEngineEntry( engine=engine, sessionmaker=sessionmaker, + connection_url=normalized_url, + pool_min_size=pool_min_size, + pool_max_size=pool_max_size, ) await self._evict_pg_if_needed() logger.info( @@ -91,14 +119,28 @@ class ProjectConnectionManager: pool_max_size: int, ) -> AsyncConnectionPool: async with self._ts_lock: - key = CacheKey(project_id=project_id, db_role=db_role) - pool = self._ts_cache.get(key) - if pool: - self._ts_cache.move_to_end(key) - return pool - pool_min_size = max(1, pool_min_size) pool_max_size = max(pool_min_size, pool_max_size) + + key = CacheKey(project_id=project_id, db_role=db_role) + entry = self._ts_cache.get(key) + if entry: + if ( + entry.connection_url == connection_url + and entry.pool_min_size == pool_min_size + and entry.pool_max_size == pool_max_size + ): + self._ts_cache.move_to_end(key) + return entry.pool + + await entry.pool.close() + logger.info( + "Rebuilding TimescaleDB pool for project %s (%s) due to config change", + project_id, + db_role, + ) + self._ts_cache.pop(key, None) + pool = AsyncConnectionPool( conninfo=connection_url, min_size=pool_min_size, @@ -107,7 +149,12 @@ class ProjectConnectionManager: kwargs={"row_factory": dict_row}, ) await pool.open() - self._ts_cache[key] = pool + self._ts_cache[key] = PoolEntry( + pool=pool, + connection_url=connection_url, + pool_min_size=pool_min_size, + pool_max_size=pool_max_size, + ) await self._evict_ts_if_needed() logger.info( "Created TimescaleDB pool for project %s (%s)", project_id, db_role @@ -123,14 +170,28 @@ class ProjectConnectionManager: pool_max_size: int, ) -> AsyncConnectionPool: async with self._pg_raw_lock: - key = CacheKey(project_id=project_id, db_role=db_role) - pool = self._pg_raw_cache.get(key) - if pool: - self._pg_raw_cache.move_to_end(key) - return pool - pool_min_size = max(1, pool_min_size) pool_max_size = max(pool_min_size, pool_max_size) + + key = CacheKey(project_id=project_id, db_role=db_role) + entry = self._pg_raw_cache.get(key) + if entry: + if ( + entry.connection_url == connection_url + and entry.pool_min_size == pool_min_size + and entry.pool_max_size == pool_max_size + ): + self._pg_raw_cache.move_to_end(key) + return entry.pool + + await entry.pool.close() + logger.info( + "Rebuilding PostgreSQL pool for project %s (%s) due to config change", + project_id, + db_role, + ) + self._pg_raw_cache.pop(key, None) + pool = AsyncConnectionPool( conninfo=connection_url, min_size=pool_min_size, @@ -139,7 +200,12 @@ class ProjectConnectionManager: kwargs={"row_factory": dict_row}, ) await pool.open() - self._pg_raw_cache[key] = pool + self._pg_raw_cache[key] = PoolEntry( + pool=pool, + connection_url=connection_url, + pool_min_size=pool_min_size, + pool_max_size=pool_max_size, + ) await self._evict_pg_raw_if_needed() logger.info( "Created PostgreSQL pool for project %s (%s)", project_id, db_role @@ -158,8 +224,8 @@ class ProjectConnectionManager: async def _evict_ts_if_needed(self) -> None: while len(self._ts_cache) > settings.PROJECT_TS_CACHE_SIZE: - key, pool = self._ts_cache.popitem(last=False) - await pool.close() + key, entry = self._ts_cache.popitem(last=False) + await entry.pool.close() logger.info( "Evicted TimescaleDB pool for project %s (%s)", key.project_id, @@ -168,8 +234,8 @@ class ProjectConnectionManager: async def _evict_pg_raw_if_needed(self) -> None: while len(self._pg_raw_cache) > settings.PROJECT_PG_CACHE_SIZE: - key, pool = self._pg_raw_cache.popitem(last=False) - await pool.close() + key, entry = self._pg_raw_cache.popitem(last=False) + await entry.pool.close() logger.info( "Evicted PostgreSQL pool for project %s (%s)", key.project_id, @@ -188,8 +254,8 @@ class ProjectConnectionManager: self._pg_cache.clear() async with self._ts_lock: - for key, pool in list(self._ts_cache.items()): - await pool.close() + for key, entry in list(self._ts_cache.items()): + await entry.pool.close() logger.info( "Closed TimescaleDB pool for project %s (%s)", key.project_id, @@ -198,8 +264,8 @@ class ProjectConnectionManager: self._ts_cache.clear() async with self._pg_raw_lock: - for key, pool in list(self._pg_raw_cache.items()): - await pool.close() + for key, entry in list(self._pg_raw_cache.items()): + await entry.pool.close() logger.info( "Closed PostgreSQL pool for project %s (%s)", key.project_id,