DSN 复用已有连接池
This commit is contained in:
@@ -24,6 +24,17 @@ logger = logging.getLogger(__name__)
|
|||||||
class PgEngineEntry:
|
class PgEngineEntry:
|
||||||
engine: AsyncEngine
|
engine: AsyncEngine
|
||||||
sessionmaker: async_sessionmaker[AsyncSession]
|
sessionmaker: async_sessionmaker[AsyncSession]
|
||||||
|
connection_url: str
|
||||||
|
pool_min_size: int
|
||||||
|
pool_max_size: int
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class PoolEntry:
|
||||||
|
pool: AsyncConnectionPool
|
||||||
|
connection_url: str
|
||||||
|
pool_min_size: int
|
||||||
|
pool_max_size: int
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
@@ -35,8 +46,8 @@ class CacheKey:
|
|||||||
class ProjectConnectionManager:
|
class ProjectConnectionManager:
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self._pg_cache: Dict[CacheKey, PgEngineEntry] = OrderedDict()
|
self._pg_cache: Dict[CacheKey, PgEngineEntry] = OrderedDict()
|
||||||
self._ts_cache: Dict[CacheKey, AsyncConnectionPool] = OrderedDict()
|
self._ts_cache: Dict[CacheKey, PoolEntry] = OrderedDict()
|
||||||
self._pg_raw_cache: Dict[CacheKey, AsyncConnectionPool] = OrderedDict()
|
self._pg_raw_cache: Dict[CacheKey, PoolEntry] = OrderedDict()
|
||||||
self._pg_lock = asyncio.Lock()
|
self._pg_lock = asyncio.Lock()
|
||||||
self._ts_lock = asyncio.Lock()
|
self._ts_lock = asyncio.Lock()
|
||||||
self._pg_raw_lock = asyncio.Lock()
|
self._pg_raw_lock = asyncio.Lock()
|
||||||
@@ -56,15 +67,29 @@ class ProjectConnectionManager:
|
|||||||
pool_max_size: int,
|
pool_max_size: int,
|
||||||
) -> async_sessionmaker[AsyncSession]:
|
) -> async_sessionmaker[AsyncSession]:
|
||||||
async with self._pg_lock:
|
async with self._pg_lock:
|
||||||
key = CacheKey(project_id=project_id, db_role=db_role)
|
|
||||||
entry = self._pg_cache.get(key)
|
|
||||||
if entry:
|
|
||||||
self._pg_cache.move_to_end(key)
|
|
||||||
return entry.sessionmaker
|
|
||||||
|
|
||||||
normalized_url = self._normalize_pg_url(connection_url)
|
normalized_url = self._normalize_pg_url(connection_url)
|
||||||
pool_min_size = max(1, pool_min_size)
|
pool_min_size = max(1, pool_min_size)
|
||||||
pool_max_size = max(pool_min_size, pool_max_size)
|
pool_max_size = max(pool_min_size, pool_max_size)
|
||||||
|
|
||||||
|
key = CacheKey(project_id=project_id, db_role=db_role)
|
||||||
|
entry = self._pg_cache.get(key)
|
||||||
|
if entry:
|
||||||
|
if (
|
||||||
|
entry.connection_url == normalized_url
|
||||||
|
and entry.pool_min_size == pool_min_size
|
||||||
|
and entry.pool_max_size == pool_max_size
|
||||||
|
):
|
||||||
|
self._pg_cache.move_to_end(key)
|
||||||
|
return entry.sessionmaker
|
||||||
|
|
||||||
|
await entry.engine.dispose()
|
||||||
|
logger.info(
|
||||||
|
"Rebuilding PostgreSQL engine for project %s (%s) due to config change",
|
||||||
|
project_id,
|
||||||
|
db_role,
|
||||||
|
)
|
||||||
|
self._pg_cache.pop(key, None)
|
||||||
|
|
||||||
engine = create_async_engine(
|
engine = create_async_engine(
|
||||||
normalized_url,
|
normalized_url,
|
||||||
pool_size=pool_min_size,
|
pool_size=pool_min_size,
|
||||||
@@ -75,6 +100,9 @@ class ProjectConnectionManager:
|
|||||||
self._pg_cache[key] = PgEngineEntry(
|
self._pg_cache[key] = PgEngineEntry(
|
||||||
engine=engine,
|
engine=engine,
|
||||||
sessionmaker=sessionmaker,
|
sessionmaker=sessionmaker,
|
||||||
|
connection_url=normalized_url,
|
||||||
|
pool_min_size=pool_min_size,
|
||||||
|
pool_max_size=pool_max_size,
|
||||||
)
|
)
|
||||||
await self._evict_pg_if_needed()
|
await self._evict_pg_if_needed()
|
||||||
logger.info(
|
logger.info(
|
||||||
@@ -91,14 +119,28 @@ class ProjectConnectionManager:
|
|||||||
pool_max_size: int,
|
pool_max_size: int,
|
||||||
) -> AsyncConnectionPool:
|
) -> AsyncConnectionPool:
|
||||||
async with self._ts_lock:
|
async with self._ts_lock:
|
||||||
key = CacheKey(project_id=project_id, db_role=db_role)
|
|
||||||
pool = self._ts_cache.get(key)
|
|
||||||
if pool:
|
|
||||||
self._ts_cache.move_to_end(key)
|
|
||||||
return pool
|
|
||||||
|
|
||||||
pool_min_size = max(1, pool_min_size)
|
pool_min_size = max(1, pool_min_size)
|
||||||
pool_max_size = max(pool_min_size, pool_max_size)
|
pool_max_size = max(pool_min_size, pool_max_size)
|
||||||
|
|
||||||
|
key = CacheKey(project_id=project_id, db_role=db_role)
|
||||||
|
entry = self._ts_cache.get(key)
|
||||||
|
if entry:
|
||||||
|
if (
|
||||||
|
entry.connection_url == connection_url
|
||||||
|
and entry.pool_min_size == pool_min_size
|
||||||
|
and entry.pool_max_size == pool_max_size
|
||||||
|
):
|
||||||
|
self._ts_cache.move_to_end(key)
|
||||||
|
return entry.pool
|
||||||
|
|
||||||
|
await entry.pool.close()
|
||||||
|
logger.info(
|
||||||
|
"Rebuilding TimescaleDB pool for project %s (%s) due to config change",
|
||||||
|
project_id,
|
||||||
|
db_role,
|
||||||
|
)
|
||||||
|
self._ts_cache.pop(key, None)
|
||||||
|
|
||||||
pool = AsyncConnectionPool(
|
pool = AsyncConnectionPool(
|
||||||
conninfo=connection_url,
|
conninfo=connection_url,
|
||||||
min_size=pool_min_size,
|
min_size=pool_min_size,
|
||||||
@@ -107,7 +149,12 @@ class ProjectConnectionManager:
|
|||||||
kwargs={"row_factory": dict_row},
|
kwargs={"row_factory": dict_row},
|
||||||
)
|
)
|
||||||
await pool.open()
|
await pool.open()
|
||||||
self._ts_cache[key] = pool
|
self._ts_cache[key] = PoolEntry(
|
||||||
|
pool=pool,
|
||||||
|
connection_url=connection_url,
|
||||||
|
pool_min_size=pool_min_size,
|
||||||
|
pool_max_size=pool_max_size,
|
||||||
|
)
|
||||||
await self._evict_ts_if_needed()
|
await self._evict_ts_if_needed()
|
||||||
logger.info(
|
logger.info(
|
||||||
"Created TimescaleDB pool for project %s (%s)", project_id, db_role
|
"Created TimescaleDB pool for project %s (%s)", project_id, db_role
|
||||||
@@ -123,14 +170,28 @@ class ProjectConnectionManager:
|
|||||||
pool_max_size: int,
|
pool_max_size: int,
|
||||||
) -> AsyncConnectionPool:
|
) -> AsyncConnectionPool:
|
||||||
async with self._pg_raw_lock:
|
async with self._pg_raw_lock:
|
||||||
key = CacheKey(project_id=project_id, db_role=db_role)
|
|
||||||
pool = self._pg_raw_cache.get(key)
|
|
||||||
if pool:
|
|
||||||
self._pg_raw_cache.move_to_end(key)
|
|
||||||
return pool
|
|
||||||
|
|
||||||
pool_min_size = max(1, pool_min_size)
|
pool_min_size = max(1, pool_min_size)
|
||||||
pool_max_size = max(pool_min_size, pool_max_size)
|
pool_max_size = max(pool_min_size, pool_max_size)
|
||||||
|
|
||||||
|
key = CacheKey(project_id=project_id, db_role=db_role)
|
||||||
|
entry = self._pg_raw_cache.get(key)
|
||||||
|
if entry:
|
||||||
|
if (
|
||||||
|
entry.connection_url == connection_url
|
||||||
|
and entry.pool_min_size == pool_min_size
|
||||||
|
and entry.pool_max_size == pool_max_size
|
||||||
|
):
|
||||||
|
self._pg_raw_cache.move_to_end(key)
|
||||||
|
return entry.pool
|
||||||
|
|
||||||
|
await entry.pool.close()
|
||||||
|
logger.info(
|
||||||
|
"Rebuilding PostgreSQL pool for project %s (%s) due to config change",
|
||||||
|
project_id,
|
||||||
|
db_role,
|
||||||
|
)
|
||||||
|
self._pg_raw_cache.pop(key, None)
|
||||||
|
|
||||||
pool = AsyncConnectionPool(
|
pool = AsyncConnectionPool(
|
||||||
conninfo=connection_url,
|
conninfo=connection_url,
|
||||||
min_size=pool_min_size,
|
min_size=pool_min_size,
|
||||||
@@ -139,7 +200,12 @@ class ProjectConnectionManager:
|
|||||||
kwargs={"row_factory": dict_row},
|
kwargs={"row_factory": dict_row},
|
||||||
)
|
)
|
||||||
await pool.open()
|
await pool.open()
|
||||||
self._pg_raw_cache[key] = pool
|
self._pg_raw_cache[key] = PoolEntry(
|
||||||
|
pool=pool,
|
||||||
|
connection_url=connection_url,
|
||||||
|
pool_min_size=pool_min_size,
|
||||||
|
pool_max_size=pool_max_size,
|
||||||
|
)
|
||||||
await self._evict_pg_raw_if_needed()
|
await self._evict_pg_raw_if_needed()
|
||||||
logger.info(
|
logger.info(
|
||||||
"Created PostgreSQL pool for project %s (%s)", project_id, db_role
|
"Created PostgreSQL pool for project %s (%s)", project_id, db_role
|
||||||
@@ -158,8 +224,8 @@ class ProjectConnectionManager:
|
|||||||
|
|
||||||
async def _evict_ts_if_needed(self) -> None:
|
async def _evict_ts_if_needed(self) -> None:
|
||||||
while len(self._ts_cache) > settings.PROJECT_TS_CACHE_SIZE:
|
while len(self._ts_cache) > settings.PROJECT_TS_CACHE_SIZE:
|
||||||
key, pool = self._ts_cache.popitem(last=False)
|
key, entry = self._ts_cache.popitem(last=False)
|
||||||
await pool.close()
|
await entry.pool.close()
|
||||||
logger.info(
|
logger.info(
|
||||||
"Evicted TimescaleDB pool for project %s (%s)",
|
"Evicted TimescaleDB pool for project %s (%s)",
|
||||||
key.project_id,
|
key.project_id,
|
||||||
@@ -168,8 +234,8 @@ class ProjectConnectionManager:
|
|||||||
|
|
||||||
async def _evict_pg_raw_if_needed(self) -> None:
|
async def _evict_pg_raw_if_needed(self) -> None:
|
||||||
while len(self._pg_raw_cache) > settings.PROJECT_PG_CACHE_SIZE:
|
while len(self._pg_raw_cache) > settings.PROJECT_PG_CACHE_SIZE:
|
||||||
key, pool = self._pg_raw_cache.popitem(last=False)
|
key, entry = self._pg_raw_cache.popitem(last=False)
|
||||||
await pool.close()
|
await entry.pool.close()
|
||||||
logger.info(
|
logger.info(
|
||||||
"Evicted PostgreSQL pool for project %s (%s)",
|
"Evicted PostgreSQL pool for project %s (%s)",
|
||||||
key.project_id,
|
key.project_id,
|
||||||
@@ -188,8 +254,8 @@ class ProjectConnectionManager:
|
|||||||
self._pg_cache.clear()
|
self._pg_cache.clear()
|
||||||
|
|
||||||
async with self._ts_lock:
|
async with self._ts_lock:
|
||||||
for key, pool in list(self._ts_cache.items()):
|
for key, entry in list(self._ts_cache.items()):
|
||||||
await pool.close()
|
await entry.pool.close()
|
||||||
logger.info(
|
logger.info(
|
||||||
"Closed TimescaleDB pool for project %s (%s)",
|
"Closed TimescaleDB pool for project %s (%s)",
|
||||||
key.project_id,
|
key.project_id,
|
||||||
@@ -198,8 +264,8 @@ class ProjectConnectionManager:
|
|||||||
self._ts_cache.clear()
|
self._ts_cache.clear()
|
||||||
|
|
||||||
async with self._pg_raw_lock:
|
async with self._pg_raw_lock:
|
||||||
for key, pool in list(self._pg_raw_cache.items()):
|
for key, entry in list(self._pg_raw_cache.items()):
|
||||||
await pool.close()
|
await entry.pool.close()
|
||||||
logger.info(
|
logger.info(
|
||||||
"Closed PostgreSQL pool for project %s (%s)",
|
"Closed PostgreSQL pool for project %s (%s)",
|
||||||
key.project_id,
|
key.project_id,
|
||||||
|
|||||||
Reference in New Issue
Block a user