Skip to content

Instantly share code, notes, and snippets.

@arkarkark
Forked from techouse/sqlite3mysql.py
Last active April 13, 2022 17:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arkarkark/88f525e10edd62c7e82788c736e85911 to your computer and use it in GitHub Desktop.
Save arkarkark/88f525e10edd62c7e82788c736e85911 to your computer and use it in GitHub Desktop.
A simple Python 3 script to transfer the data from SQLite 3 to MySQL. Requires MySQL Connector/Python 2.0.4 or higher.
#!/usr/bin/env python3
# pylint: disable=superfluous-parens,old-style-class,invalid-encoded-data,too-many-instance-attributes
# gist -u 88f525e10edd62c7e82788c736e85911 ~/bin/share/sqlite3mysql.py
# Originally From: https://gist.github.com/techouse/4deb94eee58a02d104c6
__author__ = "Klemen Tušar"
__email__ = "techouse@gmail.com"
__copyright__ = "GPL"
__version__ = "1.0.1"
__date__ = "2015-09-12"
__status__ = "Production"
import mysql.connector, os.path, re, sqlite3
from mysql.connector import errorcode
class SQLite3toMySQL:
"""
Use this class to transfer an SQLite 3 database to MySQL.
NOTE: Requires MySQL Connector/Python 2.0.4 or higher (https://dev.mysql.com/downloads/connector/python/)
"""
def __init__(self, **kwargs):
self._properties = kwargs
self._sqlite_file = self._properties.get("sqlite_file", None)
if not os.path.isfile(self._sqlite_file):
print("SQLite file does not exist!")
exit(1)
self._mysql_user = self._properties.get("mysql_user", None)
if self._mysql_user is None:
print("Please provide a MySQL user!")
exit(1)
self._mysql_password = self._properties.get("mysql_password", None)
if self._mysql_password is None:
print("Please provide a MySQL password")
exit(1)
self._mysql_database = self._properties.get("mysql_database", "transfer")
self._mysql_host = self._properties.get("mysql_host", "localhost")
self._mysql_integer_type = self._properties.get("mysql_integer_type", "int(11)")
self._mysql_decimal_type = self._properties.get(
"mysql_decimal_type", "decimal(40,20)"
)
self._mysql_string_type = self._properties.get(
"mysql_string_type", "varchar(300)"
)
self._reconnect_on_failure = self._properties.get("reconnect_on_failure", True)
self._skip_existing_tables = self._properties.get("skip_existing_tables", False)
self._table_row_limit = self._properties.get("table_row_limit", False)
if self._table_row_limit:
self._table_row_limit = int(self._table_row_limit)
self._table_names = self._properties.get("table_names", "")
if self._table_names:
self._table_names = self._table_names.split(",")
self._table_names_re = self._properties.get("table_names_re", "")
if self._table_names_re:
self._table_names_re = re.compile(self._table_names_re)
self._ignore_table_names = self._properties.get(
"ignore_table_names", ""
).split()
self._ignore_table_names_re = self._properties.get("ignore_table_names_re", "")
if self._ignore_table_names_re:
self._ignore_table_names_re = re.compile(self._ignore_table_names_re)
self._clear_tables_before_transfer = self._properties.get(
"clear_tables_before_transfer", False
)
self._sqlite = None
self._mysql = None
self.connect()
def connect(self):
"""Connect to both SQLite and MySQL."""
try:
self._sqlite.close()
except: # pylint: disable=bare-except
pass
self._sqlite = sqlite3.connect(self._sqlite_file)
self._sqlite.row_factory = sqlite3.Row
self._sqlite_cur = self._sqlite.cursor()
try:
self._mysql.close()
except: # pylint: disable=bare-except
pass
self._mysql = mysql.connector.connect(
user=self._mysql_user, password=self._mysql_password, host=self._mysql_host
)
self._mysql_cur = self._mysql.cursor(prepared=True)
try:
self._mysql.database = self._mysql_database
except mysql.connector.Error as err:
if err.errno == errorcode.ER_BAD_DB_ERROR:
self._create_database()
else:
print(err)
exit(1)
def _create_database(self):
try:
self._mysql_cur.execute(
"CREATE DATABASE IF NOT EXISTS `{}` DEFAULT CHARACTER SET 'utf8'".format(
self._mysql_database
)
)
self._mysql_cur.close()
self._mysql.commit()
self._mysql.database = self._mysql_database
self._mysql_cur = self._mysql.cursor(prepared=True)
except mysql.connector.Error as err:
print(
"_create_database failed creating databse {}: {}".format(
self._mysql_database, err
)
)
exit(1)
def _table_exists(self, table_name):
sql = "SELECT * FROM information_schema.tables WHERE table_name = '{}'".format(
table_name
)
self._mysql_cur.execute(sql)
result = self._mysql_cur.fetchone()
return bool(result)
def _ignore_table_name(self, table_name):
table_names_failed = False
if self._table_names:
if table_name in self._table_names:
return False
table_names_failed = True
if self._table_names_re:
return not self._table_names_re.match(table_name)
if table_names_failed:
return True
if self._ignore_table_names and table_name in self._ignore_table_names:
return True
if self._ignore_table_names_re and self._ignore_table_names_re.match(
table_name
):
return True
return False
def _get_type(self, type_name):
type_name = type_name.upper()
if type_name in ["TEXT", "VARCHAR"]:
return self._mysql_string_type
if type_name == "DECIMAL":
return self._mysql_decimal_type
return self._mysql_integer_type
def _create_table(self, table_name):
primary_key = ""
sql = "CREATE TABLE IF NOT EXISTS `{}` ( ".format(table_name)
self._sqlite_cur.execute('PRAGMA table_info("{}")'.format(table_name))
primary_key = []
for row in self._sqlite_cur.fetchall():
column = dict(row)
if column["pk"]:
primary_key.append(column["name"])
# print("%r %r" % (column['name'], column['type']))
sql += " `{name}` {type} {notnull} {auto_increment}, ".format(
name=column["name"],
type=self._get_type(column["type"]),
notnull="NOT NULL" if column["notnull"] or column["pk"] else "NULL",
auto_increment="AUTO_INCREMENT"
if column["pk"] and len(primary_key) == 0
else "",
)
sql += " PRIMARY KEY (`{}`) ) ENGINE = InnoDB CHARACTER SET utf8".format(
"`, `".join(primary_key)
)
try:
self._mysql_cur.execute(sql)
self._mysql.commit()
except mysql.connector.Error as err:
print(
"_create_table failed creating table {}: {}\nfrom:{}".format(
table_name, err, sql
)
)
exit(1)
def transfer(self):
self._sqlite_cur.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
)
for row in self._sqlite_cur.fetchall():
table = dict(row)
if self._ignore_table_name(table["name"]):
# print("Ignoring table %r" % table['name'])
continue
if self._skip_existing_tables and self._table_exists(table["name"]):
print("Skipping existing table %r" % table["name"])
continue
print("Transferring table {}".format(table["name"]))
if self._clear_tables_before_transfer:
self._mysql_cur.execute(
"DROP TABLE IF EXISTS `{}`".format(table["name"])
)
# create the table
self._create_table(table["name"])
# populate it
self._sqlite_cur.execute('SELECT * FROM "{}"'.format(table["name"]))
columns = [column[0] for column in self._sqlite_cur.description]
try:
if self._table_row_limit:
all_data = self._sqlite_cur.fetchmany(size=self._table_row_limit)
else:
all_data = self._sqlite_cur.fetchall()
self._mysql_cur.executemany(
"INSERT IGNORE INTO `{table}` ({fields}) VALUES ({placeholders})".format(
table=table["name"],
fields=("`{}`, " * len(columns)).rstrip(" ,").format(*columns),
placeholders=("%s, " * len(columns)).rstrip(" ,"),
),
(tuple(data) for data in all_data),
)
self._mysql.commit()
except mysql.connector.Error as err:
print(
"_insert_table_data failed inserting data into table {}: {}".format(
table["name"], err
)
)
if self._reconnect_on_failure:
self.connect()
else:
exit(1)
print("Done!")
def main():
""" For use in standalone terminal form """
import sys, argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--sqlite-file", dest="sqlite_file", default=None, help="SQLite3 db file"
)
parser.add_argument(
"--mysql-user", dest="mysql_user", default=None, help="MySQL user"
)
parser.add_argument(
"--mysql-password", dest="mysql_password", default=None, help="MySQL password"
)
parser.add_argument(
"--mysql-database", dest="mysql_database", default=None, help="MySQL host"
)
parser.add_argument(
"--mysql-host", dest="mysql_host", default="localhost", help="MySQL host"
)
parser.add_argument(
"--mysql-integer-type",
dest="mysql_integer_type",
default="int(11)",
help="MySQL default integer field type",
)
parser.add_argument(
"--mysql-decimal-type",
dest="mysql_decimal_type",
default="decimal(40,20)",
help="MySQL default decimal field type",
)
parser.add_argument(
"--mysql-string-type",
dest="mysql_string_type",
default="varchar(300)",
help="MySQL default string field type",
)
parser.add_argument(
"--reconnect-on-failure",
dest="reconnect_on_failure",
action="store_false",
default=True,
help="Reconnect to databases on table load failure.",
)
parser.add_argument(
"--skip-existing-tables",
dest="skip_existing_tables",
action="store_true",
default=False,
help="Do not transfer a table if it already exists.",
)
parser.add_argument(
"--clear-table-before-transfer",
dest="clear_tables_before_transfer",
action="store_true",
default=False,
help="Clear the table before transfer.",
)
parser.add_argument(
"--table-row-limit",
dest="table_row_limit",
default=False,
help="Only transfer this many rows per table.",
)
parser.add_argument(
"--table-names",
dest="table_names",
default="",
help="Only load these table names (comma separated).",
)
parser.add_argument(
"--table-names-re",
dest="table_names_re",
default="",
help="Only load these table names (regular expression).",
)
parser.add_argument(
"--ignore-table-names",
dest="ignore_table_names",
default="",
help="ignore these table names (comma separated).",
)
parser.add_argument(
"--ignore-table-names-re",
dest="ignore_table_names_re",
default="",
help="ignore these table names (regular expression).",
)
args = parser.parse_args()
if len(sys.argv) == 1:
parser.print_help()
exit(1)
converter = SQLite3toMySQL(**args.__dict__)
converter.transfer()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment