-
-
Save arkarkark/88f525e10edd62c7e82788c736e85911 to your computer and use it in GitHub Desktop.
A simple Python 3 script to transfer the data from SQLite 3 to MySQL. Requires MySQL Connector/Python 2.0.4 or higher.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# pylint: disable=superfluous-parens,old-style-class,invalid-encoded-data,too-many-instance-attributes | |
# gist -u 88f525e10edd62c7e82788c736e85911 ~/bin/share/sqlite3mysql.py | |
# Originally From: https://gist.github.com/techouse/4deb94eee58a02d104c6 | |
__author__ = "Klemen Tušar" | |
__email__ = "techouse@gmail.com" | |
__copyright__ = "GPL" | |
__version__ = "1.0.1" | |
__date__ = "2015-09-12" | |
__status__ = "Production" | |
import mysql.connector, os.path, re, sqlite3 | |
from mysql.connector import errorcode | |
class SQLite3toMySQL: | |
""" | |
Use this class to transfer an SQLite 3 database to MySQL. | |
NOTE: Requires MySQL Connector/Python 2.0.4 or higher (https://dev.mysql.com/downloads/connector/python/) | |
""" | |
def __init__(self, **kwargs): | |
self._properties = kwargs | |
self._sqlite_file = self._properties.get("sqlite_file", None) | |
if not os.path.isfile(self._sqlite_file): | |
print("SQLite file does not exist!") | |
exit(1) | |
self._mysql_user = self._properties.get("mysql_user", None) | |
if self._mysql_user is None: | |
print("Please provide a MySQL user!") | |
exit(1) | |
self._mysql_password = self._properties.get("mysql_password", None) | |
if self._mysql_password is None: | |
print("Please provide a MySQL password") | |
exit(1) | |
self._mysql_database = self._properties.get("mysql_database", "transfer") | |
self._mysql_host = self._properties.get("mysql_host", "localhost") | |
self._mysql_integer_type = self._properties.get("mysql_integer_type", "int(11)") | |
self._mysql_decimal_type = self._properties.get( | |
"mysql_decimal_type", "decimal(40,20)" | |
) | |
self._mysql_string_type = self._properties.get( | |
"mysql_string_type", "varchar(300)" | |
) | |
self._reconnect_on_failure = self._properties.get("reconnect_on_failure", True) | |
self._skip_existing_tables = self._properties.get("skip_existing_tables", False) | |
self._table_row_limit = self._properties.get("table_row_limit", False) | |
if self._table_row_limit: | |
self._table_row_limit = int(self._table_row_limit) | |
self._table_names = self._properties.get("table_names", "") | |
if self._table_names: | |
self._table_names = self._table_names.split(",") | |
self._table_names_re = self._properties.get("table_names_re", "") | |
if self._table_names_re: | |
self._table_names_re = re.compile(self._table_names_re) | |
self._ignore_table_names = self._properties.get( | |
"ignore_table_names", "" | |
).split() | |
self._ignore_table_names_re = self._properties.get("ignore_table_names_re", "") | |
if self._ignore_table_names_re: | |
self._ignore_table_names_re = re.compile(self._ignore_table_names_re) | |
self._clear_tables_before_transfer = self._properties.get( | |
"clear_tables_before_transfer", False | |
) | |
self._sqlite = None | |
self._mysql = None | |
self.connect() | |
def connect(self): | |
"""Connect to both SQLite and MySQL.""" | |
try: | |
self._sqlite.close() | |
except: # pylint: disable=bare-except | |
pass | |
self._sqlite = sqlite3.connect(self._sqlite_file) | |
self._sqlite.row_factory = sqlite3.Row | |
self._sqlite_cur = self._sqlite.cursor() | |
try: | |
self._mysql.close() | |
except: # pylint: disable=bare-except | |
pass | |
self._mysql = mysql.connector.connect( | |
user=self._mysql_user, password=self._mysql_password, host=self._mysql_host | |
) | |
self._mysql_cur = self._mysql.cursor(prepared=True) | |
try: | |
self._mysql.database = self._mysql_database | |
except mysql.connector.Error as err: | |
if err.errno == errorcode.ER_BAD_DB_ERROR: | |
self._create_database() | |
else: | |
print(err) | |
exit(1) | |
def _create_database(self): | |
try: | |
self._mysql_cur.execute( | |
"CREATE DATABASE IF NOT EXISTS `{}` DEFAULT CHARACTER SET 'utf8'".format( | |
self._mysql_database | |
) | |
) | |
self._mysql_cur.close() | |
self._mysql.commit() | |
self._mysql.database = self._mysql_database | |
self._mysql_cur = self._mysql.cursor(prepared=True) | |
except mysql.connector.Error as err: | |
print( | |
"_create_database failed creating databse {}: {}".format( | |
self._mysql_database, err | |
) | |
) | |
exit(1) | |
def _table_exists(self, table_name): | |
sql = "SELECT * FROM information_schema.tables WHERE table_name = '{}'".format( | |
table_name | |
) | |
self._mysql_cur.execute(sql) | |
result = self._mysql_cur.fetchone() | |
return bool(result) | |
def _ignore_table_name(self, table_name): | |
table_names_failed = False | |
if self._table_names: | |
if table_name in self._table_names: | |
return False | |
table_names_failed = True | |
if self._table_names_re: | |
return not self._table_names_re.match(table_name) | |
if table_names_failed: | |
return True | |
if self._ignore_table_names and table_name in self._ignore_table_names: | |
return True | |
if self._ignore_table_names_re and self._ignore_table_names_re.match( | |
table_name | |
): | |
return True | |
return False | |
def _get_type(self, type_name): | |
type_name = type_name.upper() | |
if type_name in ["TEXT", "VARCHAR"]: | |
return self._mysql_string_type | |
if type_name == "DECIMAL": | |
return self._mysql_decimal_type | |
return self._mysql_integer_type | |
def _create_table(self, table_name): | |
primary_key = "" | |
sql = "CREATE TABLE IF NOT EXISTS `{}` ( ".format(table_name) | |
self._sqlite_cur.execute('PRAGMA table_info("{}")'.format(table_name)) | |
primary_key = [] | |
for row in self._sqlite_cur.fetchall(): | |
column = dict(row) | |
if column["pk"]: | |
primary_key.append(column["name"]) | |
# print("%r %r" % (column['name'], column['type'])) | |
sql += " `{name}` {type} {notnull} {auto_increment}, ".format( | |
name=column["name"], | |
type=self._get_type(column["type"]), | |
notnull="NOT NULL" if column["notnull"] or column["pk"] else "NULL", | |
auto_increment="AUTO_INCREMENT" | |
if column["pk"] and len(primary_key) == 0 | |
else "", | |
) | |
sql += " PRIMARY KEY (`{}`) ) ENGINE = InnoDB CHARACTER SET utf8".format( | |
"`, `".join(primary_key) | |
) | |
try: | |
self._mysql_cur.execute(sql) | |
self._mysql.commit() | |
except mysql.connector.Error as err: | |
print( | |
"_create_table failed creating table {}: {}\nfrom:{}".format( | |
table_name, err, sql | |
) | |
) | |
exit(1) | |
def transfer(self): | |
self._sqlite_cur.execute( | |
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" | |
) | |
for row in self._sqlite_cur.fetchall(): | |
table = dict(row) | |
if self._ignore_table_name(table["name"]): | |
# print("Ignoring table %r" % table['name']) | |
continue | |
if self._skip_existing_tables and self._table_exists(table["name"]): | |
print("Skipping existing table %r" % table["name"]) | |
continue | |
print("Transferring table {}".format(table["name"])) | |
if self._clear_tables_before_transfer: | |
self._mysql_cur.execute( | |
"DROP TABLE IF EXISTS `{}`".format(table["name"]) | |
) | |
# create the table | |
self._create_table(table["name"]) | |
# populate it | |
self._sqlite_cur.execute('SELECT * FROM "{}"'.format(table["name"])) | |
columns = [column[0] for column in self._sqlite_cur.description] | |
try: | |
if self._table_row_limit: | |
all_data = self._sqlite_cur.fetchmany(size=self._table_row_limit) | |
else: | |
all_data = self._sqlite_cur.fetchall() | |
self._mysql_cur.executemany( | |
"INSERT IGNORE INTO `{table}` ({fields}) VALUES ({placeholders})".format( | |
table=table["name"], | |
fields=("`{}`, " * len(columns)).rstrip(" ,").format(*columns), | |
placeholders=("%s, " * len(columns)).rstrip(" ,"), | |
), | |
(tuple(data) for data in all_data), | |
) | |
self._mysql.commit() | |
except mysql.connector.Error as err: | |
print( | |
"_insert_table_data failed inserting data into table {}: {}".format( | |
table["name"], err | |
) | |
) | |
if self._reconnect_on_failure: | |
self.connect() | |
else: | |
exit(1) | |
print("Done!") | |
def main(): | |
""" For use in standalone terminal form """ | |
import sys, argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--sqlite-file", dest="sqlite_file", default=None, help="SQLite3 db file" | |
) | |
parser.add_argument( | |
"--mysql-user", dest="mysql_user", default=None, help="MySQL user" | |
) | |
parser.add_argument( | |
"--mysql-password", dest="mysql_password", default=None, help="MySQL password" | |
) | |
parser.add_argument( | |
"--mysql-database", dest="mysql_database", default=None, help="MySQL host" | |
) | |
parser.add_argument( | |
"--mysql-host", dest="mysql_host", default="localhost", help="MySQL host" | |
) | |
parser.add_argument( | |
"--mysql-integer-type", | |
dest="mysql_integer_type", | |
default="int(11)", | |
help="MySQL default integer field type", | |
) | |
parser.add_argument( | |
"--mysql-decimal-type", | |
dest="mysql_decimal_type", | |
default="decimal(40,20)", | |
help="MySQL default decimal field type", | |
) | |
parser.add_argument( | |
"--mysql-string-type", | |
dest="mysql_string_type", | |
default="varchar(300)", | |
help="MySQL default string field type", | |
) | |
parser.add_argument( | |
"--reconnect-on-failure", | |
dest="reconnect_on_failure", | |
action="store_false", | |
default=True, | |
help="Reconnect to databases on table load failure.", | |
) | |
parser.add_argument( | |
"--skip-existing-tables", | |
dest="skip_existing_tables", | |
action="store_true", | |
default=False, | |
help="Do not transfer a table if it already exists.", | |
) | |
parser.add_argument( | |
"--clear-table-before-transfer", | |
dest="clear_tables_before_transfer", | |
action="store_true", | |
default=False, | |
help="Clear the table before transfer.", | |
) | |
parser.add_argument( | |
"--table-row-limit", | |
dest="table_row_limit", | |
default=False, | |
help="Only transfer this many rows per table.", | |
) | |
parser.add_argument( | |
"--table-names", | |
dest="table_names", | |
default="", | |
help="Only load these table names (comma separated).", | |
) | |
parser.add_argument( | |
"--table-names-re", | |
dest="table_names_re", | |
default="", | |
help="Only load these table names (regular expression).", | |
) | |
parser.add_argument( | |
"--ignore-table-names", | |
dest="ignore_table_names", | |
default="", | |
help="ignore these table names (comma separated).", | |
) | |
parser.add_argument( | |
"--ignore-table-names-re", | |
dest="ignore_table_names_re", | |
default="", | |
help="ignore these table names (regular expression).", | |
) | |
args = parser.parse_args() | |
if len(sys.argv) == 1: | |
parser.print_help() | |
exit(1) | |
converter = SQLite3toMySQL(**args.__dict__) | |
converter.transfer() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment