I needed a way to refresh a postgres logical replication subscription on many hosts. I decided to write a python script to achieve this, it reads in a list of hostnames and executes the refresh against each host. The subscription names are part of the host name name and I have a master server that I can run psql against each host as postgres.
Here is the contents of the host file:
1 2 3 4 5 |
[postgres@dbadmin ~]$ cat contentdb_subscribed_hosts.txt crmrepdb-c crmrepdb-j maindb-a maindb-c |
Here is what the logical replication subscriptions look like:
1 2 3 4 |
crmrepdb_c_subscription01 crmrepdb_j_subscription01 maindb_a_subscription01 maindb_c_subscription01 |
Here is the python code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
#!/usr/bin/python3.11 from utils import PostgresDB def run_query(): try: with open('contentdb_subscribed_hosts.txt', "r") as file: for host in file: if "-" not in host: hostname = host.strip() sql_command = ( f"ALTER SUBSCRIPTION " f"{hostname}_subscription01 " f"REFRESH PUBLICATION;" ) print(f"executing refresh against host: {hostname}") else: hostname = host.strip() parsed_host = hostname.split('-') print(f"executing refresh against host: {parsed_host}") sql_command = ( f"ALTER SUBSCRIPTION " f"{parsed_host[0]}_{parsed_host[1]}_subscription01 " f"REFRESH PUBLICATION;" ) db = PostgresDB.PostgresDB( host=f"{hostname}", database="famnet5", user="postgres", port=5432, password='' ) try: db.connect() db.execute_query(f"{sql_command}") finally: db.close() except Exception as e: print(f"Error: {e}") if __name__ == "__main__": run_query() |
Here is the contents of the utils lib:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
import psycopg2 from psycopg2 import sql, OperationalError, DatabaseError class PostgresDB: def __init__(self, host, database, user, password, port=5432): """ Initialize the database connection parameters. """ self.host = host self.database = database self.user = user self.password = password self.port = port self.connection = None def connect(self): """ Establish a connection to the PostgreSQL database. """ try: self.connection = psycopg2.connect( host=self.host, database=self.database, user=self.user, password=self.password, port=self.port ) self.connection.autocommit = True print("Database connection established.") except OperationalError as e: print(f"Error connecting to database: {e}") raise def execute_query(self, query, params=None): try: with self.connection.cursor() as cursor: cursor.execute(query, params) self.connection.commit() print("Query executed successfully.") except DatabaseError as e: self.connection.rollback() print(f"Error executing query: {e}") raise def fetch_data(self, query, params=None): try: with self.connection.cursor() as cursor: cursor.execute(query, params) result = cursor.fetchall() return result except DatabaseError as e: print(f"Error fetching data: {e}") raise def close(self): if self.connection: self.connection.close() print("Database connection closed.") |
This is what it looks like when running:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
[postgres@dbadmin ~]$ python3.11 refresh_replication_subscriptions.py executing refresh against host: ['crmrepdb', 'c'] Database connection established. Query executed successfully. Database connection closed. executing refresh against host: ['crmrepdb', 'j'] Database connection established. Query executed successfully. Database connection closed. executing refresh against host: ['maindb', 'a'] Database connection established. Query executed successfully. Database connection closed. executing refresh against host: ['maindb', 'c'] |