Created
October 12, 2019 20:55
-
-
Save kedder/ecaf2456589cd51773b5d63cb9bfb1cd to your computer and use it in GitHub Desktop.
Simple script to copy data from postgresql database to mysql with the same schema
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Convert postgres database to mysql on live running servers | |
Schemas in both databases should match. | |
""" | |
from pprint import pprint | |
import pymysql.cursors | |
import psycopg2 | |
import psycopg2.extras | |
mysqlconn = None | |
pgconn = None | |
def connect(): | |
global mysqlconn | |
global pgconn | |
mysqlconn = pymysql.connect( | |
host="localhost", | |
user="root", | |
password="mysql", | |
db="partkeepr", | |
port=13306, | |
charset="utf8mb4", | |
cursorclass=pymysql.cursors.DictCursor, | |
) | |
print(f"Connected to mysql: {mysqlconn}") | |
pgconn = psycopg2.connect( | |
"dbname=partkeepr user=partkeepr host=localhost port=15432" | |
) | |
print(f"Connected to postgres: {pgconn}") | |
def mysql_refintegrity(enable): | |
with mysqlconn.cursor() as cursor: | |
cursor.execute("SET FOREIGN_KEY_CHECKS = %s" % int(enable)) | |
def fetch_mysql_tables(): | |
with mysqlconn.cursor() as cursor: | |
# Read a single record | |
sql = "SHOW TABLES" | |
cursor.execute(sql) | |
result = cursor.fetchall() | |
return [list(r.values())[0] for r in result] | |
def fetch_columns(table): | |
with mysqlconn.cursor() as cursor: | |
cursor.execute("SHOW COLUMNS FROM %s" % table) | |
result = cursor.fetchall() | |
return [r["Field"] for r in result] | |
def fetch_pg_data(table): | |
with pgconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: | |
cur.execute("SELECT * FROM %s" % table) | |
result = cur.fetchall() | |
return [dict(r) for r in result] | |
def make_mysql_inserts(table, columns, data): | |
inserts = [] | |
for row in data: | |
values = [row[c.lower()] for c in columns] | |
collist = ", ".join(columns) | |
params = ", ".join(["%s"] * len(columns)) | |
sql = f"INSERT INTO {table} ({collist}) VALUES ({params})" | |
inserts.append((sql, values)) | |
return inserts | |
def clean_mysql_table(table): | |
with mysqlconn.cursor() as cursor: | |
cursor.execute(f"DELETE FROM {table}") | |
def insert_mysql_data(table, inserts): | |
with mysqlconn.cursor() as cursor: | |
for sql, params in inserts: | |
print(f"EXEC: {sql}") | |
cursor.execute(sql, params) | |
def main(): | |
connect() | |
tables = fetch_mysql_tables() | |
mysql_schema = {t: fetch_columns(t) for t in tables} | |
pprint(mysql_schema) | |
mysql_refintegrity(False) | |
for t in tables: | |
data = fetch_pg_data(t) | |
inserts = make_mysql_inserts(t, mysql_schema[t], data) | |
clean_mysql_table(t) | |
insert_mysql_data(t, inserts) | |
mysql_refintegrity(True) | |
mysqlconn.commit() | |
mysqlconn.close() | |
pgconn.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment