import os import psycopg as pg from psycopg.rows import dict_row from .connection import g_conn_dict as conn from .postgresql_info import get_pgconn_string, get_pg_config, get_pg_password # no undo/redo _server_databases = ["template0", "template1", "postgres", "project"] def list_project() -> list[str]: ps = [] with pg.connect(conninfo=get_pgconn_string(), autocommit=True) as conn: with conn.cursor(row_factory=dict_row) as cur: for p in cur.execute( f"select datname from pg_database where datname <> 'postgres' and datname <> 'template0' and datname <> 'template1' and datname <> 'project'" ): ps.append(p["datname"]) return ps def have_project(name: str) -> bool: with pg.connect(conninfo=get_pgconn_string(), autocommit=True) as conn: with conn.cursor() as cur: cur.execute(f"select * from pg_database where datname = '{name}'") return cur.rowcount > 0 def copy_project(source: str, new: str) -> None: with pg.connect(conninfo=get_pgconn_string(), autocommit=True) as conn: with conn.cursor() as cur: cur.execute(f'create database "{new}" with template = {source}') # 2025-02-07, WMH # copyproject会把pg中operation这个表的全部内容也加进去,我们实际项目运行一周后operation这个表会变得特别大,导致CopyProject花费的时间很长,CopyProjectEx把operation的在复制时没有一块复制过去,节省时间 class CopyProjectEx: @staticmethod def create_database(connection, new_db): with connection.cursor() as cursor: cursor.execute(f'create database "{new_db}"') connection.commit() @staticmethod def execute_pg_dump(source_db, exclude_table_list): os.environ["PGPASSWORD"] = get_pg_password() # 设置密码环境变量 pg_config = get_pg_config() host = pg_config["host"] port = pg_config["port"] user = pg_config["user"] dump_command_structure = f"pg_dump -h {host} -p {port} -U {user} -F c -s -f source_db_structure.dump {source_db}" os.system(dump_command_structure) if exclude_table_list is not None: exclude_table = " ".join(["-T {}".format(i) for i in exclude_table_list]) dump_command_db = f"pg_dump -h {host} -p {port} -U {user} -F c -a {exclude_table} -f source_db.dump {source_db}" else: dump_command_db = f"pg_dump -h {host} -p {port} -U {user} -F c -a -f source_db.dump {source_db}" os.system(dump_command_db) @staticmethod def execute_pg_restore(new_db): os.environ["PGPASSWORD"] = get_pg_password() # 设置密码环境变量 pg_config = get_pg_config() host = pg_config["host"] port = pg_config["port"] user = pg_config["user"] restore_command_structure = f"pg_restore -h {host} -p {port} -U {user} -d {new_db} source_db_structure.dump" os.system(restore_command_structure) restore_command_db = ( f"pg_restore -h {host} -p {port} -U {user} -d {new_db} source_db.dump" ) os.system(restore_command_db) @staticmethod def init_operation_table(connection, excluded_table): with connection.cursor() as cursor: if "operation" in excluded_table: insert_query = "insert into operation (id, redo, undo, redo_cs, undo_cs) values (0, '', '', '', '')" cursor.execute(insert_query) if "current_operation" in excluded_table: insert_query = "insert into current_operation (id) values (0)" cursor.execute(insert_query) if "restore_operation" in excluded_table: insert_query = "insert into restore_operation (id) values (0)" cursor.execute(insert_query) if "batch_operation" in excluded_table: insert_query = "insert into batch_operation (id, redo, undo, redo_cs, undo_cs) values (0, '', '', '', '')" cursor.execute(insert_query) if "operation_table" in excluded_table: insert_query = ( "insert into operation_table (option) values ('operation')" ) cursor.execute(insert_query) connection.commit() def __call__(self, source: str, new_db: str, excluded_tables: [str] = None) -> None: source_connection = pg.connect(conninfo=get_pgconn_string(), autocommit=True) self.create_database(source_connection, new_db) self.execute_pg_dump(source, excluded_tables) self.execute_pg_restore(new_db) source_connection.close() new_db_connection = pg.connect( conninfo=get_pgconn_string(db_name=new_db), autocommit=True ) self.init_operation_table(new_db_connection, excluded_tables) new_db_connection.close() def create_project(name: str) -> None: return copy_project("project", name) def delete_project(name: str) -> None: with pg.connect(conninfo=get_pgconn_string(), autocommit=True) as conn: with conn.cursor() as cur: cur.execute( f"select pg_terminate_backend(pid) from pg_stat_activity where datname = '{name}'" ) cur.execute(f'drop database "{name}"') def clean_project(excluded: list[str] = []) -> None: projects = list_project() with pg.connect(conninfo=get_pgconn_string(), autocommit=True) as conn: with conn.cursor(row_factory=dict_row) as cur: row = cur.execute(f"select current_database()").fetchone() if row != None: current_db = row["current_database"] if current_db in projects: projects.remove(current_db) for project in projects: if project in _server_databases or project in excluded: continue cur.execute( f"select pg_terminate_backend(pid) from pg_stat_activity where datname = '{project}'" ) cur.execute(f'drop database "{project}"') def open_project(name: str) -> None: if name not in conn: conn[name] = pg.connect( conninfo=get_pgconn_string(db_name=name), autocommit=True ) def is_project_open(name: str) -> bool: return name in conn def close_project(name: str) -> None: if name in conn: conn[name].close() del conn[name]