Skip to content

Instantly share code, notes, and snippets.

@elprup
Created November 27, 2012 08:00
Show Gist options
  • Save elprup/4153049 to your computer and use it in GitHub Desktop.
Save elprup/4153049 to your computer and use it in GitHub Desktop.
MySQL tables svn-like sync between server
#!/bin/python
'''
MySQL tables svn-like sync between server
@ through ssh connect
@ incremental sync
@ can retry and redo
@ svn-style mode
@ timeout
version 1.2
modified 2012-12-11
usage:
checkout set up intial mysql table information, check from remote mysql server to local
update update table information
'''
import re
import os
import logging
import subprocess
import threading
logging.getLogger().setLevel(logging.DEBUG)
class TableExistedExeption(Exception):
def __init__(self, table):
self.table = table
def __str__(self):
return '%s already existed.' % self.table
class ConfigFileNotSetException(Exception):
def __init__(self):
pass
def __str__(self):
return 'config file name not set'
class ConfigNotFoundException(Exception):
def __init__(self, key):
self.key = key
def __str__(self):
return 'config not found: %s' % self.key
class CommandTimeoutException(Exception):
def __init__(self):
pass
def __str__(self):
return 'command timeout'
class Command(object):
'''
refer jcollado code:http://stackoverflow.com/questions/1191374/subprocess-with-timeout
'''
def __init__(self, cmd):
self.cmd = cmd
self.process = None
def run(self, timeout):
def target():
logging.debug('Thread started')
self.process = subprocess.Popen(self.cmd, shell=True, stdout=subprocess.PIPE)
self.out = self.process.communicate()[0]
logging.debug('Thread finished')
thread = threading.Thread(target=target)
thread.start()
thread.join(timeout)
if thread.is_alive():
logging.debug('Terminating process')
self.process.terminate()
thread.join()
raise CommandTimeoutException
return self.out
def shell_exec(command, timeout=None, dryrun=False):
logging.info('shell_exec command:'+command)
result = None
if not dryrun:
command = Command(command)
result = command.run(timeout=timeout)
logging.debug('shell_exec result: %s' % result)
return result
def ssh_command(param, command=''):
def quote(s):
return s.replace('\\', '\\\\').replace("\"",'\\\"')
sshcmd_list = ['ssh', param, '"%s"' % quote(command)]
sshcmd = ' '.join(sshcmd_list)
return sshcmd
def mysql_command(conn, sql=''):
# check if table exists
def quote(s):
return s.replace('\\', '\\\\').replace("\"",'\\\"')
command = 'mysql -u%s -h%s %s' % (conn.user, conn.host, conn.database)
if conn.password and len(conn.password):
command += ' -p%s' % conn.password
if sql != '':
command += ' -e "%s"' % quote(sql)
return command
def check_column_type(conn, table, column, ssh_param=None):
'''check column type'''
sql = r'''select data_type from information_schema.columns where table_name = "%s" and table_schema="%s" and column_name="%s"'''\
% (table, conn.database, column)
command = mysql_command(conn, sql)
if ssh_param:
command = ssh_command(param, command)
columntype = shell_exec(command, dryrun=False).split('\n')[1].strip()
logging.debug('check table %s field %s : %s' % (table, column, columntype))
return columntype
def mysqldump_command(conn, table, create=False, where=None, drop=False):
# check if table exists
def quote(s):
return s.replace('\\', '\\\\').replace("\"",'\\\"')
command = 'mysqldump --compact --compress -u%s -h%s %s %s' % (conn.user, conn.host, conn.database, table)
if conn.password and len(conn.password):
command += ' -p%s' % conn.password
if not create:
command += ' --no-create-info'
if drop:
command += ' --add-drop-table'
if where:
command += ' --where "%s"' % quote(where)
return command
class Connection(object):
def __init__(self, host=None, user=None, password=None, database=None):
self.host, self.user, self.password, self.database = \
host, user, password, database
def __repr__(self):
return '%s:%s@%s:%s' % (self.user, self.password, self.host, self.database)
def load_from_string(self, s):
regex = '[:@]'
l = re.split(regex, s)
self.user, self.password, self.host, self.database = l[0], l[1], l[2], l[3]
class Config(object):
'''key: (local_conn, table)
value: [remote_conn, pk, last_pk]
'''
def __init__(self, filename=None):
self.filename = filename
self.configdict = {}
if filename is not None:
self._load()
def _load(self):
try:
f = open(self.filename, 'r')
except IOError:
logging.info('config file not found, use empty config.')
return
for line in f.xreadlines():
if line.startswith('#'):
continue
row = line.split()
if len(row) == 5:
self.configdict[(row[0],row[1])] = [row[2], row[3], row[4]]
f.close()
def get(self, key):
return self.configdict.get(key,None)
def set(self, key, value):
self.configdict[key] = value
def store(self, filename=None):
content = ''
for k,v in self.configdict.items():
content += ' '.join([str(k[0]), str(k[1]), str(v[0]), str(v[1]), str(v[2])]) + '\n'
store_file = self.filename if filename is None else filename
if store_file is not None:
f = open(store_file, 'w')
f.write(content)
f.close()
else:
raise ConfigFileNotSetException()
def checkout_table(remote_conn, local_conn, table, ssh_param=None, pk='id', dryrun=False, overwrite=False):
# check if table exists
tables_result = shell_exec(command=mysql_command(local_conn, "show tables"), dryrun=False)
local_tables = [row for row in tables_result.split('\n')[1:] if len(row) > 0]
logging.debug('local tables:%s' % local_tables)
drop = False
if table in local_tables:
if not overwrite:
raise TableExistedExeption(table)
else:
logging.warning('Table %s already existed, now overwrite it.' % table)
drop = True
# use pipe to redirect mysqldump from remote to local
remote_command = mysqldump_command(remote_conn, table, create=True, drop=drop)
local_command = mysql_command(local_conn)
if ssh_param:
remote_command = ssh_command(ssh_param, remote_command)
command = remote_command + ' | ' + local_command
shell_exec(command, timeout=600, dryrun=dryrun)
# record relationship
if dryrun:
return
config = Config('.mysqlsvn')
# test pk type
last_pk = shell_exec(mysql_command(local_conn, "SELECT max(%s) FROM %s" % (pk, table))).split('\n')[1].strip()
config.set((local_conn,table), (remote_conn, pk, last_pk))
config.store()
def update_table(local_conn, table, ssh_param=None, dryrun=False):
# search config file to get remote conn
config = Config('.mysqlsvn')
configname = (str(local_conn), table)
myconfig = config.get(configname)
if myconfig is None:
raise ConfigNotFoundException( str(configname) )
remote_conn_string, pk, last_pk = myconfig[0], myconfig[1], myconfig[2]
# generate remote conn
remote_conn = Connection()
remote_conn.load_from_string(remote_conn_string)
# doing update
if check_column_type(local_conn, table, pk) == 'varchar':
remote_command = mysqldump_command(remote_conn, table, create=False, where="%s > \"%s\"" % (pk, last_pk))
else:
remote_command = mysqldump_command(remote_conn, table, create=False, where="%s > %s" % (pk, last_pk))
local_command = mysql_command(local_conn)
if ssh_param:
remote_command = ssh_command(ssh_param, remote_command)
command = remote_command + ' | ' + local_command
shell_exec(command, timeout=60, dryrun=dryrun)
if dryrun:
return
last_pk = shell_exec(mysql_command(local_conn, "SELECT max(%s) FROM %s" % (pk, table))).split('\n')[1].strip()
config.set((str(local_conn),table), (remote_conn, pk, last_pk))
logging.debug(config.configdict)
config.store()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment