Skip to content

Instantly share code, notes, and snippets.

@miratcan
Created January 31, 2013 20:40
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save miratcan/4686232 to your computer and use it in GitHub Desktop.
Save miratcan/4686232 to your computer and use it in GitHub Desktop.
Database copier function built on lurker
from __future__ import print_function
class DbNotFound(Exception):
pass
def copy(src_conn, src_dbname, dst_conn, dst_dbname=None, drop_table=True,
drop_db=True, silent=False, tables_to_copy=None):
""" (<lurker connection>, str, <lurker connection>, dst_dbname=str, drop_table=bool
drop_db=bool, silent=bool, tables_to_copy=list) -> bool
Copies database (src_dbname) from source connection (src_conn) to destination
database (dst_dbname) at destination connection (dst_conn)
If dst_dbname is not given, will be same as src_dbname.
"""
def values2Str(values_dict):
values_tuple_string = u"("
for key, value in values_dict.iteritems():
if value == None:
values_tuple_string += u","
elif type(value) in (unicode, datetime.date):
values_tuple_string += u"'%s', " % value
elif type(value) == int or type(value) == long:
values_tuple_string += u"%s, " % value
values_tuple_string = values_tuple_string[:-2]
values_tuple_string += u")"
return unicode(values_tuple_string)
def keys2Str(values_dict):
keys_tuple_string = u"("
for key, value in values_dict.iteritems():
keys_tuple_string += "%s, " % key
keys_tuple_string = keys_tuple_string[:-2]
keys_tuple_string += u")"
return unicode(keys_tuple_string)
if not dst_dbname:
dst_dbname = src_dbname
# Get db names
db_names = [result['Database'] for result in \
src_conn.get_results("show databases")]
# If source db is not exists, raise
if not src_dbname in db_names:
raise DbNotFound("Database '%s' is not found at source connection" % (
src_dbname))
table_names = [result['Tables_in_%s' % src_dbname] for result in
src_conn.get_results("show full tables in %s where Table_Type ='BASE TABLE'" % src_dbname)]
# If there is spesific table names to copy, filter table_names
if tables_to_copy:
table_names = filter(lambda tn: tn in tables_to_copy, table_names)
dst_conn.execute("SET FOREIGN_KEY_CHECKS = 0")
if drop_db:
dst_conn.execute("DROP DATABASE IF EXISTS %s" % dst_dbname)
dst_conn.execute("CREATE DATABASE IF NOT EXISTS %s" % dst_dbname)
for table_name in table_names:
create_table_query = src_conn.get_results(
"show create table `%s`.`%s`" % (src_dbname, table_name)
)[0]['Create Table']
create_table_query = "%s %s.%s" % (
create_table_query[:12], dst_dbname, create_table_query[13:])
# Create table on destination
dst_conn.execute(
"drop table if exists `%s`.`%s`" % (dst_dbname, table_name))
dst_conn.execute(create_table_query)
# Get total rows of source table
num_of_rows = src_conn.get_row(
"select count(*) from `%s`.`%s`" % (src_dbname, table_name)
)['count(*)']
# Create loop for copying rows.
row_idx = 0
for values_dict in src_conn.get_results(
"select * from `%s`.`%s`" % (src_dbname, table_name)):
# Create inserting query
query = u"INSERT INTO `%s`.`%s` %s VALUES %s" % (
dst_dbname, table_name, keys2Str(values_dict),
values2Str(values_dict))
row_idx += 1
if not silent:
# In every 1000 row, print status
if row_idx % 1000 == 0 or row_idx == num_of_rows:
print("[%s] Copying table: %s [%s/%s rows completed]" % (\
datetime.now(), table_name, row_idx, num_of_rows))
# Execute query
dst_conn.execute(query)
if num_of_rows and not silent:
print()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment