You can use this python code to setup a cron that will sync postgres tables from one database to another. This will read from a config file and will be able to do multiple tables from the same run. This can be useful to sync a daily table from source to destinations. This will also send a alert to slack if its ok or critical.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
[logging] log_file = pg_table_sync_dev_to_prod.log log_path = /home/postgres [pg_table_source_dest] public.jason_test_table1 = public.jason_test_table1 public.jason_test_table2 = public.jason_test_table2 public.jason_test_table3 = public.jason_test_table3 [hosts] source_db = db-sbx01 dest_db = db10 [database] dev_db = devdb prod_db = proddb [dump_location] local_location = /u04/pg_data_dumps/transfer_tables/ [slack] webhook = https://hooks.slack.com/services/<yourwebhookhere> |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
__author__ = 'jralph' __version__ = '1.0.0' import configparser import os import sys import logging import subprocess import shlex import socket import datetime import requests # set hostname. hostname = socket.gethostname() # set date now. now = datetime.datetime.now() # obtain script name and assign to variable. script_name = sys.argv[0].split('.')[0] # sanity check for configuration environment variable. if "INI_PATH" not in os.environ.keys(): print('INI_PATH is not set, check the .bashrc') sys.exit(1) # parse the configuration sections of the ini file. config = configparser.ConfigParser() try: config.read(os.environ['INI_PATH'] + '/pg_table_sync_dev_prod.ini') config.sections() log_file = config.get('logging', 'log_file') log_path = config.get('logging', 'log_path') slack_hook = config.get('slack', 'webhook') except configparser.NoSectionError as e: print('FATAL: Command failed with error [{0}]'.format(e)) # setup logging. try: logging.basicConfig(filename='%s/%s' % (log_path, log_file), format='%(asctime)s %(message)s', datefmt='%m-%d-%Y %I:%M:%S %p -', level=logging.DEBUG) except NameError as e: print('FATAL: Command failed with error [{0}]'.format(e)) # get hosts and tablenames. try: pg_tables_to_sync = dict(config.items('pg_table_source_dest')) source_db = config.get('hosts', 'source_db') dest_db = config.get('hosts', 'dest_db') dump_location = config.get('dump_location', 'local_location') dev_db = config.get('database', 'dev_db') prod_db = config.get('database', 'prod_db') except (configparser.NoSectionError, NameError) as e: logging.critical('FATAL: Command failed with error [{0}]'.format(e)) # pg_dump function. def pg_dump(): cmd_list = [] tables = {} try: tables = sorted(pg_tables_to_sync.items()) except NameError as e: logging.critical('FATAL: Command failed with error [{0}]'.format(e)) for key, value in tables: dump_cmd = 'pg_dump -Fc -h {0} -d {1} -t {2} -f {4}{2}.{3}.pgdump'.format( source_db, dev_db, key, now.strftime("%Y%m%d"), dump_location) cmd_list.append(dump_cmd) return cmd_list # pg_restore function. def pg_restore(): cmd_list = [] tables = {} try: tables = sorted(pg_tables_to_sync.items()) except NameError as e: logging.critical('FATAL: Command failed with error [{0}]'.format(e)) for key, value in tables: dump_cmd = 'pg_restore -c -h {0} -d {1} {4}{2}.{3}.pgdump'.format( dest_db, prod_db, key, now.strftime("%Y%m%d"), dump_location) cmd_list.append(dump_cmd) return cmd_list # send to slack function. def send_to_slack(slack_url, state, command, date_format, priority, target_os): slack_data = {'attachments': [ { "fallback": "Required plain-text summary of the attachment.", "color": priority, "pretext": "PG Table Sync", "author_name": command, "text": "%s" % date_format, "fields": [ { "title": "%s" % target_os, "value": state, "short": "false" } ], "footer": "AFS Slack", "footer_icon": "https://platform.slack-edge.com" "/img/default_application_icon.png" } ]} response = requests.post( slack_url, json=slack_data) if response.status_code != 200: raise ValueError( 'Request to slack returned an error %s, the response is:\n%s' % (response.status_code, response.text)) # execute with logging. def execute_jobs(cmd): try: logging.info('Start Command: [%s]' % cmd) subprocess.run(shlex.split(cmd), check=True) logging.info('Command Success: [%s]' % cmd) try: send_to_slack(slack_hook, 'Ok', cmd, datetime.datetime.today(), 'good', hostname) except ValueError as e: logging.critical('FATAL: Slack post failed with error [%s]' % e) except subprocess.CalledProcessError as e: logging.critical('[%s] FATAL: Command failed with error [%s]' % (cmd, e)) try: send_to_slack(slack_hook, 'Critical', cmd, datetime.datetime.today(), 'danger', hostname) except ValueError as e: logging.critical('FATAL: Slack post failed with error [%s]' % e) # main def main(): for command in pg_dump(): execute_jobs(command) for command in pg_restore(): execute_jobs(command) logging.info('finished ' + script_name) if __name__ == "__main__": main() |
LOGGING EXAMPLE:
1 2 3 4 5 6 7 8 9 |
07-06-2020 11:44:28 AM - Start Command: [pg_dump -Fc -h db-sbx01 -d db1 -t public.jason_test_table -f /u04/pg_data_dumps/transfer_tables/public.jason_test_table.20200706.pdump] 07-06-2020 11:44:30 AM - Command Success: [pg_dump -Fc -h db-sbx01 -d db1 -t public.jason_test_table -f /u04/pg_data_dumps/transfer_tables/public.jason_test_table.20200706.pgdump] 07-06-2020 11:44:30 AM - Starting new HTTPS connection (1): hooks.slack.com 07-06-2020 11:44:30 AM - https://hooks.slack.com:443 "POST /services/T04MEPB2K/B72JPEUUB/nfEqv7bsKafUUjLoKgo0oT5S HTTP/1.1" 200 22 07-06-2020 11:44:30 AM - Start Command: [pg_restore -c -h db10 -d db1 /u04/pg_data_dumps/transfer_tables/public.jason_test_table.20200706] 07-06-2020 11:44:31 AM - Command Success: [pg_restore -c -h db10 -d db1 /u04/pg_data_dumps/transfer_tables/public.jason_test_table.20200706] 07-06-2020 11:44:31 AM - Starting new HTTPS connection (1): hooks.slack.com 07-06-2020 11:44:31 AM - https://hooks.slack.com:443 "POST /services/T04MEPB2K/B72JPEUUB/nfEqv7bsKafUUjLoKgo0oT5S HTTP/1.1" 200 22 07-06-2020 11:44:31 AM - finished PgTableSyncDevProd |