Skip to content

Instantly share code, notes, and snippets.

@Havoc24k
Last active September 29, 2023 08:55
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save Havoc24k/d646eb54925e8777a01c45b33e4a0f68 to your computer and use it in GitHub Desktop.
Save Havoc24k/d646eb54925e8777a01c45b33e4a0f68 to your computer and use it in GitHub Desktop.
Import tables from one PostgreSQL database to another using the INFORMATION_SCHEMA
#!/usr/bin/python3.6
"""Sync PostgreSQL."""
import psycopg2
import sys
from psycopg2.extras import RealDictCursor, execute_values
"""
Usage:
sync.py
Options:
-f Force import tables without asking for user input per table
"""
"""
Configure the info for connecting to both databases
"""
SOURCE_USERNAME = ''
SOURCE_PASSWD = ''
SOURCE_URL = ''
SOURCE_PORT = 5432
SOURCE_DB = ''
DEST_USERNAME = ''
DEST_PASSWD = ''
DEST_URL = ''
DEST_PORT = 5432
DEST_DB = ''
"""
Add the tables to import
if no schema_name is provided it defaults to 'public'
"""
list_of_tables = [
'schema_name.table_name'
]
def format_drop_create_query(table_name, columns, table_schema='public'):
"""format_drop_create_query."""
"""
We create the DROP query portion
"""
q_drop = 'DROP TABLE IF EXISTS "{}"."{}";'.format(table_schema, table_name)
"""
For each column we get the:
- column name
- column type
- column character maximum length for varchar values
- indication if it allows NUll or not
and we add the column part to the CREATE query
"""
columns_string = ""
i = 1
for c_data in columns:
if c_data['data_type'] == 'character varying':
data_type = 'varchar({})'.format(
c_data['character_maximum_length'])
else:
data_type = c_data['data_type']
is_nullable_str = ''
if c_data['is_nullable'] == 'NO':
is_nullable_str = 'NOT NULL'
column_string = '"{}" {} {}'.format(
c_data['column_name'], data_type, is_nullable_str)
if i < len(columns):
column_string += ','
column_string += '\n'
columns_string += column_string
i += 1
q_create_body = '''CREATE TABLE "{}"."{}" (
{}
);
'''.format(table_schema, table_name, columns_string)
"""
We combine the two parts and return it
"""
return q_drop + '\n' + q_create_body
if __name__ == '__main__':
try:
"""
We check if the user set the -f arg or not and set the correct
value to the 'prompt' flag
"""
prompt = True
if len(sys.argv) > 1 and sys.argv[1] == '-f':
prompt = False
"""
This is the query that we will run to get
the data from the INFORMATION_SCHEMA
"""
get_table_struct_query = '''
SELECT
table_schema,
column_name,
data_type,
character_maximum_length,
is_nullable
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
table_name = '{}'
AND table_schema = '{}'
ORDER BY ordinal_position
'''
"""
Create connection to the source database
and initialize a cursor
"""
src = psycopg2.connect(
database=SOURCE_DB,
user=SOURCE_USERNAME,
password=SOURCE_PASSWD,
host=SOURCE_URL,
port=SOURCE_PORT,
cursor_factory=RealDictCursor
)
src_cur = src.cursor()
"""
Create connection to the destination database
and initialize a cursor
"""
dest = psycopg2.connect(
database=DEST_DB,
user=DEST_USERNAME,
password=DEST_PASSWD,
host=DEST_URL,
port=DEST_PORT,
cursor_factory=RealDictCursor
)
dest_cur = dest.cursor()
print('================================\n')
"""
Iterate the list of tables we want to import
"""
for table_name in list_of_tables:
print('Importing {} \nfrom: [SOURCE] {} \nto: [DEST] {}\n'.format(
table_name, src.dsn, dest.dsn))
"""
Check if the -f flag was provided
If not we ask the user to verify the import process for each table
"""
if prompt:
user_resp = input('Do you want to continue? [N/y] ')
if user_resp != 'y' and user_resp != 'Y':
print('Skiping table {}'.format(table_name))
continue
print('Starting import...\n')
print('Getting table data from SOURCE...', '\n')
"""
Separate the table name and the schema name from the
combined table name in the list.
If the name did not contain a schema we default to `public`
"""
table_name_info = table_name.split('.')
if len(table_name_info) == 2:
table_schema = table_name_info[0]
table_name = table_name_info[1]
else:
table_schema = 'public'
"""
Execute the query in the source database that will
get all the data of the table
"""
q_get = 'SELECT * FROM "{}"."{}";'.format(table_schema, table_name)
src_cur.execute(q_get)
table_data = src_cur.fetchall()
"""
Execute the query in the source database that will get the
column data of the table from the INFORMATION_SCHEMA
"""
print('Create table to DEST...', '\n')
src_cur.execute(get_table_struct_query.format(table_name, table_schema))
"""
Use the reponse from the previous query and
execute the DROP...CREATE... query to the destination database
"""
dest_cur.execute(format_drop_create_query(
table_name, columns=src_cur.fetchall()))
if dest.notices:
print(dest.notices, '\n')
"""
After the table is created to the destination database
we execute the query that inserts tha data.
"""
print('Insert data to DEST...', '\n')
column_names = ",".join(list(table_data[0].keys()))
q_insert = 'INSERT INTO "{}"."{}" ({}) VALUES %s'.format(
'public', table_name, column_names)
execute_values(
dest_cur,
q_insert,
argslist=[list(v.values()) for v in table_data],
template=None,
page_size=100
)
"""
We commit everything and inform the user
"""
dest.commit()
print('...finished import\n')
print('================================\n')
"""
We close the connections to the databases
"""
src.close()
dest.close()
except psycopg2.Error as e:
"""
In case of error we rollback all actions to the destination database
and close all connections
"""
dest.rollback()
src.close()
dest.close()
print('An error occured, ALL actions have been rollbacked.', '\n')
print(e, '\n')
print('================================\n')
@neeweesomu
Copy link

An error occured, ALL actions have been rollbacked.

syntax error at or near "None"
LINE 20: "remarks" varchar(None)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment