{"id":1281,"date":"2025-09-17T13:24:40","date_gmt":"2025-09-17T17:24:40","guid":{"rendered":"https:\/\/jasonralph.org\/?p=1281"},"modified":"2025-09-17T18:57:15","modified_gmt":"2025-09-17T22:57:15","slug":"refresh-logical-replication-subscription-from-publisher-postgres-on-multiple-hosts","status":"publish","type":"post","link":"https:\/\/jasonralph.org\/?p=1281","title":{"rendered":"Postgres Refresh Logical Replication Subscription From Publisher On Multiple Hosts"},"content":{"rendered":"<p>I needed a way to refresh a postgres logical replication subscription on many hosts.  I decided to write a python script to achieve this, it reads in a list of hostnames and executes the refresh against each host.  The subscription names are part of the host name name and I have a master server that I can run psql against each host as postgres.  <\/p>\n<p>Here is the contents of the host file:<\/p>\n<pre class=\"theme:solarized-dark lang:default decode:true \" >\r\n[postgres@dbadmin ~]$ cat contentdb_subscribed_hosts.txt \r\ncrmrepdb-c\r\ncrmrepdb-j\r\nmaindb-a\r\nmaindb-c\r\n<\/pre>\n<p>Here is what the logical replication subscriptions look like:<\/p>\n<pre class=\"theme:solarized-dark lang:default decode:true \" >\r\ncrmrepdb_c_subscription01\r\ncrmrepdb_j_subscription01\r\nmaindb_a_subscription01\r\nmaindb_c_subscription01\r\n<\/pre>\n<p>Here is the python code:<\/p>\n<pre class=\"theme:solarized-dark lang:default decode:true \" >\r\n#!\/usr\/bin\/python3.11\r\nfrom utils import PostgresDB\r\n\r\n\r\ndef run_query():\r\n    try:\r\n        with open('contentdb_subscribed_hosts.txt', \"r\") as file:\r\n            for host in file:\r\n                if \"-\" not in host:\r\n                    hostname = host.strip()\r\n                    sql_command = (\r\n                        f\"ALTER SUBSCRIPTION \"\r\n                        f\"{hostname}_subscription01 \"\r\n                        f\"REFRESH PUBLICATION;\"\r\n                    )\r\n                    print(f\"executing refresh against host: {hostname}\")\r\n                else:\r\n                    hostname = host.strip()\r\n                    parsed_host = hostname.split('-')\r\n                    print(f\"executing refresh against host: {parsed_host}\")\r\n                    sql_command = (\r\n                        f\"ALTER SUBSCRIPTION \"\r\n                        f\"{parsed_host[0]}_{parsed_host[1]}_subscription01 \"\r\n                        f\"REFRESH PUBLICATION;\"\r\n                    )\r\n                db = PostgresDB.PostgresDB(\r\n                    host=f\"{hostname}\",\r\n                    database=\"famnet5\",\r\n                    user=\"postgres\",\r\n                    port=5432,\r\n                    password=''\r\n                    )\r\n                try:\r\n                    db.connect()\r\n                    db.execute_query(f\"{sql_command}\")\r\n                finally:\r\n                    db.close()\r\n    except Exception as e:\r\n        print(f\"Error: {e}\")\r\n\r\n\r\nif __name__ == \"__main__\":\r\n    run_query()\r\n\r\n<\/pre>\n<p>Here is the contents of the utils lib:<\/p>\n<pre class=\"theme:solarized-dark lang:default decode:true \" >\r\nimport psycopg2\r\nfrom psycopg2 import sql, OperationalError, DatabaseError\r\n\r\nclass PostgresDB:\r\n    def __init__(self, host, database, user, password, port=5432):\r\n        \"\"\"\r\n        Initialize the database connection parameters.\r\n        \"\"\"\r\n        self.host = host\r\n        self.database = database\r\n        self.user = user\r\n        self.password = password\r\n        self.port = port\r\n        self.connection = None\r\n\r\n    def connect(self):\r\n        \"\"\"\r\n        Establish a connection to the PostgreSQL database.\r\n        \"\"\"\r\n        try:\r\n            self.connection = psycopg2.connect(\r\n                host=self.host,\r\n                database=self.database,\r\n                user=self.user,\r\n                password=self.password,\r\n                port=self.port\r\n            )\r\n            self.connection.autocommit = True\r\n            print(\"Database connection established.\")\r\n        except OperationalError as e:\r\n            print(f\"Error connecting to database: {e}\")\r\n            raise\r\n\r\n    def execute_query(self, query, params=None):\r\n        try:\r\n            with self.connection.cursor() as cursor:\r\n                cursor.execute(query, params)\r\n                self.connection.commit()\r\n                print(\"Query executed successfully.\")\r\n        except DatabaseError as e:\r\n            self.connection.rollback()\r\n            print(f\"Error executing query: {e}\")\r\n            raise\r\n\r\n    def fetch_data(self, query, params=None):\r\n        try:\r\n            with self.connection.cursor() as cursor:\r\n                cursor.execute(query, params)\r\n                result = cursor.fetchall()\r\n                return result\r\n        except DatabaseError as e:\r\n            print(f\"Error fetching data: {e}\")\r\n            raise\r\n\r\n    def close(self):\r\n        if self.connection:\r\n            self.connection.close()\r\n            print(\"Database connection closed.\")\r\n\r\n<\/pre>\n<p>This is what it looks like when running:<\/p>\n<pre class=\"theme:solarized-dark lang:default decode:true \" >\r\n[postgres@dbadmin ~]$ python3.11 refresh_replication_subscriptions.py \r\nexecuting refresh against host: ['crmrepdb', 'c']\r\nDatabase connection established.\r\nQuery executed successfully.\r\nDatabase connection closed.\r\nexecuting refresh against host: ['crmrepdb', 'j']\r\nDatabase connection established.\r\nQuery executed successfully.\r\nDatabase connection closed.\r\nexecuting refresh against host: ['maindb', 'a']\r\nDatabase connection established.\r\nQuery executed successfully.\r\nDatabase connection closed.\r\nexecuting refresh against host: ['maindb', 'c']\r\n<\/pre>\n","protected":false},"excerpt":{"rendered":"<p>I needed a way to refresh a postgres logical replication subscription on many hosts. I decided to write a python script to achieve this, it reads in a list of hostnames and executes the refresh against each host. The subscription names are part of the host name name and I have a master server that [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1,4],"tags":[154,51,156,24,157,155],"class_list":["post-1281","post","type-post","status-publish","format-standard","hentry","category-general-code","category-python","tag-logical-replication","tag-postgres","tag-publication","tag-python-2","tag-refresh","tag-replication"],"_links":{"self":[{"href":"https:\/\/jasonralph.org\/index.php?rest_route=\/wp\/v2\/posts\/1281","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/jasonralph.org\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/jasonralph.org\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/jasonralph.org\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/jasonralph.org\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=1281"}],"version-history":[{"count":4,"href":"https:\/\/jasonralph.org\/index.php?rest_route=\/wp\/v2\/posts\/1281\/revisions"}],"predecessor-version":[{"id":1285,"href":"https:\/\/jasonralph.org\/index.php?rest_route=\/wp\/v2\/posts\/1281\/revisions\/1285"}],"wp:attachment":[{"href":"https:\/\/jasonralph.org\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=1281"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/jasonralph.org\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=1281"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/jasonralph.org\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=1281"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}