Files
TJWaterServerBinary/timescaledb/database.py

56 lines
1.7 KiB
Python

import logging
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import psycopg_pool
from psycopg.rows import dict_row
import timescaledb.timescaledb_info as timescaledb_info
# Configure logging
logger = logging.getLogger(__name__)
class Database:
def __init__(self):
self.pool = None
def init_pool(self):
"""Initialize the connection pool."""
conn_string = timescaledb_info.get_pgconn_string()
try:
self.pool = psycopg_pool.AsyncConnectionPool(
conninfo=conn_string,
min_size=1,
max_size=20,
open=False, # Don't open immediately, wait for startup
kwargs={"row_factory": dict_row} # Return rows as dictionaries
)
logger.info("TimescaleDB connection pool initialized.")
except Exception as e:
logger.error(f"Failed to initialize TimescaleDB connection pool: {e}")
raise
async def open(self):
if self.pool:
await self.pool.open()
async def close(self):
"""Close the connection pool."""
if self.pool:
await self.pool.close()
logger.info("TimescaleDB connection pool closed.")
@asynccontextmanager
async def get_connection(self) -> AsyncGenerator:
"""Get a connection from the pool."""
if not self.pool:
raise Exception("Database pool is not initialized.")
async with self.pool.connection() as conn:
yield conn
db = Database()
async def get_db_connection():
"""Dependency for FastAPI to get a database connection."""
async with db.get_connection() as conn:
yield conn