Skip to content

Instantly share code, notes, and snippets.

@kedder
Created October 12, 2019 20:55
Show Gist options
  • Save kedder/ecaf2456589cd51773b5d63cb9bfb1cd to your computer and use it in GitHub Desktop.
Save kedder/ecaf2456589cd51773b5d63cb9bfb1cd to your computer and use it in GitHub Desktop.
Simple script to copy data from postgresql database to mysql with the same schema
"""Convert postgres database to mysql on live running servers
Schemas in both databases should match.
"""
from pprint import pprint
import pymysql.cursors
import psycopg2
import psycopg2.extras
mysqlconn = None
pgconn = None
def connect():
global mysqlconn
global pgconn
mysqlconn = pymysql.connect(
host="localhost",
user="root",
password="mysql",
db="partkeepr",
port=13306,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
)
print(f"Connected to mysql: {mysqlconn}")
pgconn = psycopg2.connect(
"dbname=partkeepr user=partkeepr host=localhost port=15432"
)
print(f"Connected to postgres: {pgconn}")
def mysql_refintegrity(enable):
with mysqlconn.cursor() as cursor:
cursor.execute("SET FOREIGN_KEY_CHECKS = %s" % int(enable))
def fetch_mysql_tables():
with mysqlconn.cursor() as cursor:
# Read a single record
sql = "SHOW TABLES"
cursor.execute(sql)
result = cursor.fetchall()
return [list(r.values())[0] for r in result]
def fetch_columns(table):
with mysqlconn.cursor() as cursor:
cursor.execute("SHOW COLUMNS FROM %s" % table)
result = cursor.fetchall()
return [r["Field"] for r in result]
def fetch_pg_data(table):
with pgconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute("SELECT * FROM %s" % table)
result = cur.fetchall()
return [dict(r) for r in result]
def make_mysql_inserts(table, columns, data):
inserts = []
for row in data:
values = [row[c.lower()] for c in columns]
collist = ", ".join(columns)
params = ", ".join(["%s"] * len(columns))
sql = f"INSERT INTO {table} ({collist}) VALUES ({params})"
inserts.append((sql, values))
return inserts
def clean_mysql_table(table):
with mysqlconn.cursor() as cursor:
cursor.execute(f"DELETE FROM {table}")
def insert_mysql_data(table, inserts):
with mysqlconn.cursor() as cursor:
for sql, params in inserts:
print(f"EXEC: {sql}")
cursor.execute(sql, params)
def main():
connect()
tables = fetch_mysql_tables()
mysql_schema = {t: fetch_columns(t) for t in tables}
pprint(mysql_schema)
mysql_refintegrity(False)
for t in tables:
data = fetch_pg_data(t)
inserts = make_mysql_inserts(t, mysql_schema[t], data)
clean_mysql_table(t)
insert_mysql_data(t, inserts)
mysql_refintegrity(True)
mysqlconn.commit()
mysqlconn.close()
pgconn.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment