import os import psycopg as pg from psycopg.rows import dict_row from .connection import g_conn_dict as conn # no undo/redo _server_databases = ['template0', 'template1', 'postgres', 'project'] def list_project() -> list[str]: ps = [] with pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) as conn: with conn.cursor(row_factory=dict_row) as cur: for p in cur.execute(f"select datname from pg_database where datname <> 'postgres' and datname <> 'template0' and datname <> 'template1' and datname <> 'project'"): ps.append(p['datname']) return ps def have_project(name: str) -> bool: with pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) as conn: with conn.cursor() as cur: cur.execute(f"select * from pg_database where datname = '{name}'") return cur.rowcount > 0 def copy_project(source: str, new: str) -> None: with pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) as conn: with conn.cursor() as cur: cur.execute(f'create database "{new}" with template = {source}') # 2025-02-07, WMH # copyproject会把pg中operation这个表的全部内容也加进去,我们实际项目运行一周后operation这个表会变得特别大,导致CopyProject花费的时间很长,CopyProjectEx把operation的在复制时没有一块复制过去,节省时间 class CopyProjectEx: @ staticmethod def create_database(connection, new_db): with connection.cursor() as cursor: cursor.execute(f'create database "{new_db}"') connection.commit() @staticmethod def execute_pg_dump(hostname, source_db, exclude_table_list): dump_command_structure = ( f'pg_dump -h {hostname} -F c -s -f source_db_structure.dump {source_db}' ) os.system(dump_command_structure) if exclude_table_list is not None: exclude_table = ' '.join(['-T {}'.format(i) for i in exclude_table_list]) dump_command_db = ( f'pg_dump -h {hostname} -F c -a {exclude_table} -f source_db.dump {source_db}' ) else: dump_command_db = ( f'pg_dump -h {hostname} -F c -a -f source_db.dump {source_db}' ) os.system(dump_command_db) @staticmethod def execute_pg_restore(hostname, new_db): restore_command_structure = ( f'pg_restore -h {hostname} -d {new_db} source_db_structure.dump' ) os.system(restore_command_structure) restore_command_db = ( f'pg_restore -h {hostname} -d {new_db} source_db.dump' ) os.system(restore_command_db) @staticmethod def init_operation_table(connection, excluded_table): with connection.cursor() as cursor: if 'operation' in excluded_table: insert_query \ = "insert into operation (id, redo, undo, redo_cs, undo_cs) values (0, '', '', '', '')" cursor.execute(insert_query) if 'current_operation' in excluded_table: insert_query \ = "insert into current_operation (id) values (0)" cursor.execute(insert_query) if 'restore_operation' in excluded_table: insert_query \ = "insert into restore_operation (id) values (0)" cursor.execute(insert_query) if 'batch_operation' in excluded_table: insert_query \ = "insert into batch_operation (id, redo, undo, redo_cs, undo_cs) values (0, '', '', '', '')" cursor.execute(insert_query) if 'operation_table' in excluded_table: insert_query \ = "insert into operation_table (option) values ('operation')" cursor.execute(insert_query) connection.commit() def __call__(self, source: str, new: str, excluded_table: [str] = None) -> None: connection = pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) self.create_database(connection, new) self.execute_pg_dump('127.0.0.1', source, excluded_table) self.execute_pg_restore('127.0.0.1', new) connection = pg.connect(conninfo=f"dbname='{new}' host=127.0.0.1", autocommit=True) self.init_operation_table(connection, excluded_table) def create_project(name: str) -> None: return copy_project('project', name) def delete_project(name: str) -> None: with pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) as conn: with conn.cursor() as cur: cur.execute(f"select pg_terminate_backend(pid) from pg_stat_activity where datname = '{name}'") cur.execute(f'drop database "{name}"') def clean_project(excluded: list[str] = []) -> None: projects = list_project() with pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) as conn: with conn.cursor(row_factory=dict_row) as cur: row = cur.execute(f"select current_database()").fetchone() if row != None: current_db = row['current_database'] if current_db in projects: projects.remove(current_db) for project in projects: if project in _server_databases or project in excluded: continue cur.execute(f"select pg_terminate_backend(pid) from pg_stat_activity where datname = '{project}'") cur.execute(f'drop database "{project}"') def open_project(name: str) -> None: if name not in conn: conn[name] = pg.connect(conninfo=f"dbname={name} host=127.0.0.1", autocommit=True) def is_project_open(name: str) -> bool: return name in conn def close_project(name: str) -> None: if name in conn: conn[name].close() del conn[name]