Files
TJWaterServer/api/project.py
2025-02-07 23:14:21 +08:00

152 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import psycopg as pg
from psycopg.rows import dict_row
from .connection import g_conn_dict as conn
# no undo/redo
_server_databases = ['template0', 'template1', 'postgres', 'project']
def list_project() -> list[str]:
ps = []
with pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) as conn:
with conn.cursor(row_factory=dict_row) as cur:
for p in cur.execute(f"select datname from pg_database where datname <> 'postgres' and datname <> 'template0' and datname <> 'template1' and datname <> 'project'"):
ps.append(p['datname'])
return ps
def have_project(name: str) -> bool:
with pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) as conn:
with conn.cursor() as cur:
cur.execute(f"select * from pg_database where datname = '{name}'")
return cur.rowcount > 0
def copy_project(source: str, new: str) -> None:
with pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) as conn:
with conn.cursor() as cur:
cur.execute(f'create database "{new}" with template = {source}')
# 2025-02-07, WMH
# copyproject会把pg中operation这个表的全部内容也加进去我们实际项目运行一周后operation这个表会变得特别大导致CopyProject花费的时间很长CopyProjectEx把operation的在复制时没有一块复制过去节省时间
class CopyProjectEx:
@ staticmethod
def create_database(connection, new_db):
with connection.cursor() as cursor:
cursor.execute(f'create database "{new_db}"')
connection.commit()
@staticmethod
def execute_pg_dump(hostname, source_db, exclude_table_list):
dump_command_structure = (
f'pg_dump -h {hostname} -F c -s -f source_db_structure.dump {source_db}'
)
os.system(dump_command_structure)
if exclude_table_list is not None:
exclude_table = ' '.join(['-T {}'.format(i) for i in exclude_table_list])
dump_command_db = (
f'pg_dump -h {hostname} -F c -a {exclude_table} -f source_db.dump {source_db}'
)
else:
dump_command_db = (
f'pg_dump -h {hostname} -F c -a -f source_db.dump {source_db}'
)
os.system(dump_command_db)
@staticmethod
def execute_pg_restore(hostname, new_db):
restore_command_structure = (
f'pg_restore -h {hostname} -d {new_db} source_db_structure.dump'
)
os.system(restore_command_structure)
restore_command_db = (
f'pg_restore -h {hostname} -d {new_db} source_db.dump'
)
os.system(restore_command_db)
@staticmethod
def init_operation_table(connection, excluded_table):
with connection.cursor() as cursor:
if 'operation' in excluded_table:
insert_query \
= "insert into operation (id, redo, undo, redo_cs, undo_cs) values (0, '', '', '', '')"
cursor.execute(insert_query)
if 'current_operation' in excluded_table:
insert_query \
= "insert into current_operation (id) values (0)"
cursor.execute(insert_query)
if 'restore_operation' in excluded_table:
insert_query \
= "insert into restore_operation (id) values (0)"
cursor.execute(insert_query)
if 'batch_operation' in excluded_table:
insert_query \
= "insert into batch_operation (id, redo, undo, redo_cs, undo_cs) values (0, '', '', '', '')"
cursor.execute(insert_query)
if 'operation_table' in excluded_table:
insert_query \
= "insert into operation_table (option) values ('operation')"
cursor.execute(insert_query)
connection.commit()
def __call__(self, source: str, new: str, excluded_table: [str] = None) -> None:
connection = pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True)
self.create_database(connection, new)
self.execute_pg_dump('127.0.0.1', source, excluded_table)
self.execute_pg_restore('127.0.0.1', new)
connection = pg.connect(conninfo=f"dbname='{new}' host=127.0.0.1", autocommit=True)
self.init_operation_table(connection, excluded_table)
def create_project(name: str) -> None:
return copy_project('project', name)
def delete_project(name: str) -> None:
with pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) as conn:
with conn.cursor() as cur:
cur.execute(f"select pg_terminate_backend(pid) from pg_stat_activity where datname = '{name}'")
cur.execute(f'drop database "{name}"')
def clean_project(excluded: list[str] = []) -> None:
projects = list_project()
with pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) as conn:
with conn.cursor(row_factory=dict_row) as cur:
row = cur.execute(f"select current_database()").fetchone()
if row != None:
current_db = row['current_database']
if current_db in projects:
projects.remove(current_db)
for project in projects:
if project in _server_databases or project in excluded:
continue
cur.execute(f"select pg_terminate_backend(pid) from pg_stat_activity where datname = '{project}'")
cur.execute(f'drop database "{project}"')
def open_project(name: str) -> None:
if name not in conn:
conn[name] = pg.connect(conninfo=f"dbname={name} host=127.0.0.1", autocommit=True)
def is_project_open(name: str) -> bool:
return name in conn
def close_project(name: str) -> None:
if name in conn:
conn[name].close()
del conn[name]