Skip to content

Instantly share code, notes, and snippets.

@mnpenner
Created August 20, 2018 18:05
Show Gist options
  • Save mnpenner/92b261dc5d677fd63f694a7fa370ab50 to your computer and use it in GitHub Desktop.
Save mnpenner/92b261dc5d677fd63f694a7fa370ab50 to your computer and use it in GitHub Desktop.
MySQL .frm file reader
#
# Copyright (c) 2010, 2013, Oracle and/or its affiliates. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
"""
This module contains abstractions of a MySQL Database object used by
multiple utilities.
"""
import multiprocessing
import os
import re
import sys
from collections import deque
from mysql.utilities.exception import UtilError, UtilDBError
from mysql.utilities.common.pattern_matching import REGEXP_QUALIFIED_OBJ_NAME
from mysql.utilities.common.options import obj2sql
from mysql.utilities.common.server import connect_servers, Server
from mysql.utilities.common.user import User
from mysql.utilities.common.sql_transform import (quote_with_backticks,
remove_backtick_quoting,
is_quoted_with_backticks)
# List of database objects for enumeration
_DATABASE, _TABLE, _VIEW, _TRIG, _PROC, _FUNC, _EVENT, _GRANT = "DATABASE", \
"TABLE", "VIEW", "TRIGGER", "PROCEDURE", "FUNCTION", "EVENT", "GRANT"
_OBJTYPE_QUERY = """
(
SELECT TABLE_TYPE as object_type
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLES.TABLE_SCHEMA = '%(db_name)s' AND
TABLES.TABLE_NAME = '%(obj_name)s'
)
UNION
(
SELECT 'TRIGGER' as object_type
FROM INFORMATION_SCHEMA.TRIGGERS
WHERE TRIGGER_SCHEMA = '%(db_name)s' AND
TRIGGER_NAME = '%(obj_name)s'
)
UNION
(
SELECT TYPE as object_type
FROM mysql.proc
WHERE DB = '%(db_name)s' AND NAME = '%(obj_name)s'
)
UNION
(
SELECT 'EVENT' as object_type
FROM mysql.event
WHERE DB = '%(db_name)s' AND NAME = '%(obj_name)s'
)
"""
_DEFINITION_QUERY = """
SELECT %(columns)s
FROM INFORMATION_SCHEMA.%(table_name)s WHERE %(conditions)s
"""
_PARTITION_QUERY = """
SELECT PARTITION_NAME, SUBPARTITION_NAME, PARTITION_ORDINAL_POSITION,
SUBPARTITION_ORDINAL_POSITION, PARTITION_METHOD, SUBPARTITION_METHOD,
PARTITION_EXPRESSION, SUBPARTITION_EXPRESSION, PARTITION_DESCRIPTION
FROM INFORMATION_SCHEMA.PARTITIONS
WHERE TABLE_SCHEMA = '%(db)s' AND TABLE_NAME = '%(name)s'
"""
_COLUMN_QUERY = """
SELECT ORDINAL_POSITION, COLUMN_NAME, COLUMN_TYPE, IS_NULLABLE,
COLUMN_DEFAULT, EXTRA, COLUMN_COMMENT, COLUMN_KEY
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = '%(db)s' AND TABLE_NAME = '%(name)s'
"""
_FK_CONSTRAINT_QUERY = """
SELECT TABLE_NAME, CONSTRAINT_NAME, COLUMN_NAME, REFERENCED_TABLE_SCHEMA,
REFERENCED_TABLE_NAME, REFERENCED_COLUMN_NAME, UPDATE_RULE, DELETE_RULE
FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE
USING (CONSTRAINT_SCHEMA, CONSTRAINT_NAME, TABLE_NAME, REFERENCED_TABLE_NAME)
WHERE CONSTRAINT_SCHEMA = '{DATABASE!s}'
AND TABLE_NAME = '{TABLE!s}'
"""
_ALTER_TABLE_ADD_FK_CONSTRAINT = """
ALTER TABLE {DATABASE!s}.{TABLE!s} add CONSTRAINT `{CONSTRAINT_NAME!s}`
FOREIGN KEY (`{COLUMN_NAMES}`)
REFERENCES `{REFERENCED_DATABASE}`.`{REFERENCED_TABLE!s}`
(`{REFERENCED_COLUMNS!s}`)
ON UPDATE {UPDATE_RULE}
ON DELETE {DELETE_RULE}
"""
def _multiprocess_tbl_copy_task(copy_tbl_task):
"""Multiprocess copy table data method.
This method wraps the copy of the table's data to allow its concurrent
execution by a pool of processes.
copy_tbl_task[in] dictionary of values required by a process to
perform the table copy task, namely:
'source_srv': <dict with source connections values>,
'dest_srv': <dict with destination connections values>,
'source_db': <source database name>,
'destination_db': <destination database name>,
'table': <table to copy>,
'options': <dict of options>,
'cloning': <cloning flag>,
'connections': <number of concurrent connections>,
'q_source_db': <quoted source database name>.
"""
# Get input to execute task.
source_srv = copy_tbl_task.get('source_srv')
dest_srv = copy_tbl_task.get('dest_srv')
source_db = copy_tbl_task.get('source_db')
target_db = copy_tbl_task.get('target_db')
table = copy_tbl_task.get('table')
options = copy_tbl_task.get('options')
cloning = copy_tbl_task.get('cloning')
# Execute copy table task.
# NOTE: Must handle any exception here, because worker processes will not
# propagate them to the main process.
try:
_copy_table_data(source_srv, dest_srv, source_db, target_db, table,
options, cloning)
except UtilError:
_, err, _ = sys.exc_info()
print("ERROR copying data for table '{0}': {1}".format(table,
err.errmsg))
def _copy_table_data(source_srv, destination_srv, db_name, new_db_name,
tbl_name, tbl_options, cloning, connections=1):
"""Copy the data of the specified table.
This method copies/clones all the data from a table to another (new)
database.
source_srv[in] Source server (Server instance or dict. with the
connection values).
destination_srv[in] Destination server (Server instance or dict. with the
connection values).
db_name[in] Name of the database with the table to copy.
new_db_name[in] Name of the destination database to copy the table.
tbl_name[in] Name of the table to copy.
tbl_options[in] Table options.
cloning[in] Cloning flag, in order to use a different method to
copy data on the same server
connections[in] Specify the use of multiple connections/processes to
copy the table data (rows). By default, only 1 used.
Note: Multiprocessing option should be preferred.
"""
# Import table needed here to avoid circular import issues.
from mysql.utilities.common.table import Table
# Handle source and destination server instances or connection values.
# Note: For multiprocessing the use of connection values instead of a
# server instance is required to avoid internal errors.
if isinstance(source_srv, Server):
source = source_srv
else:
# Get source server instance from connection values.
conn_options = {
'quiet': True, # Avoid repeating output for multiprocessing.
'version': "5.1.30",
}
servers = connect_servers(source_srv, None, conn_options)
source = servers[0]
if isinstance(destination_srv, Server):
destination = destination_srv
else:
# Get source server instance from connection values.
conn_options = {
'quiet': True, # Avoid repeating output for multiprocessing.
'version': "5.1.30",
}
servers = connect_servers(destination_srv, None, conn_options)
destination = servers[0]
# Copy table data.
if not tbl_options.get("quiet", False):
print("# Copying data for TABLE {0}.{1}".format(db_name,
tbl_name))
q_tbl_name = "{0}.{1}".format(quote_with_backticks(db_name),
quote_with_backticks(tbl_name))
tbl = Table(source, q_tbl_name, tbl_options)
if tbl is None:
raise UtilDBError("Cannot create table object before copy.", -1,
db_name)
tbl.copy_data(destination, cloning, new_db_name, connections)
class Database(object):
"""
The Database class encapsulates a database. The class has the following
capabilities:
- Check to see if the database exists
- Drop the database
- Create the database
- Clone the database
- Print CREATE statements for all objects
"""
obj_type = _DATABASE
def __init__(self, source, name, options=None):
"""Constructor
source[in] A Server object
name[in] Name of database
verbose[in] print extra data during operations (optional)
default value = False
options[in] Array of options for controlling what is included
and how operations perform (e.g., verbose)
"""
if options is None:
options = {}
self.source = source
# Keep database identifier considering backtick quotes
if is_quoted_with_backticks(name):
self.q_db_name = name
self.db_name = remove_backtick_quoting(self.q_db_name)
else:
self.db_name = name
self.q_db_name = quote_with_backticks(self.db_name)
self.verbose = options.get("verbose", False)
self.skip_tables = options.get("skip_tables", False)
self.skip_views = options.get("skip_views", False)
self.skip_triggers = options.get("skip_triggers", False)
self.skip_procs = options.get("skip_procs", False)
self.skip_funcs = options.get("skip_funcs", False)
self.skip_events = options.get("skip_events", False)
self.skip_grants = options.get("skip_grants", False)
self.skip_create = options.get("skip_create", False)
self.skip_data = options.get("skip_data", False)
self.exclude_patterns = options.get("exclude_patterns", None)
self.use_regexp = options.get("use_regexp", False)
self.skip_table_opts = options.get("skip_table_opts", False)
self.new_db = None
self.q_new_db = None
self.init_called = False
self.destination = None # Used for copy mode
self.cloning = False # Used for clone mode
self.query_options = { # Used for skipping buffered fetch of rows
'fetch': False,
'commit': False, # No COMMIT needed for DDL operations (default).
}
self.constraints = deque() # Used to store constraints to execute
# after table creation, deque is
# thread-safe
self.objects = []
self.new_objects = []
def exists(self, server=None, db_name=None):
"""Check to see if the database exists
server[in] A Server object
(optional) If omitted, operation is performed
using the source server connection.
db_name[in] database name
(optional) If omitted, operation is performed
on the class instance table name.
return True = database exists, False = database does not exist
"""
if not server:
server = self.source
db = None
if db_name:
db = db_name
else:
db = self.db_name
_QUERY = """
SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA
WHERE SCHEMA_NAME = '%s'
"""
res = server.exec_query(_QUERY % db)
return (res is not None and len(res) >= 1)
def drop(self, server, quiet, db_name=None):
"""Drop the database
server[in] A Server object
quiet[in] ignore error on drop
db_name[in] database name
(optional) If omitted, operation is performed
on the class instance table name.
return True = database successfully dropped, False = error
"""
db = None
if db_name:
db = db_name if is_quoted_with_backticks(db_name) \
else quote_with_backticks(db_name)
else:
db = self.q_db_name
op_ok = False
if quiet:
try:
server.exec_query("DROP DATABASE %s" % (db),
self.query_options)
op_ok = True
except:
pass
else:
server.exec_query("DROP DATABASE %s" % (db),
self.query_options)
op_ok = True
return op_ok
def create(self, server, db_name=None, charset_name=None,
collation_name=None):
"""Create the database
server[in] A Server object
db_name[in] database name
(optional) If omitted, operation is performed
on the class instance table name.
return True = database successfully created, False = error
"""
if db_name:
db = db_name if is_quoted_with_backticks(db_name) \
else quote_with_backticks(db_name)
else:
db = self.q_db_name
specification = ""
if charset_name:
specification = " DEFAULT CHARACTER SET {0}".format(charset_name)
if collation_name:
specification = "{0} DEFAULT COLLATE {0}".format(specification,
collation_name)
query_create_db = "CREATE DATABASE {0} {1}".format(db, specification)
server.exec_query(query_create_db, self.query_options)
return True
def __make_create_statement(self, obj_type, obj):
"""Construct a CREATE statement for a database object.
This method will get the CREATE statement from the method
get_create_statement() and also replace all occurrances of the
old database name with the new.
obj_type[in] Object type (string) e.g. DATABASE
obj[in] A row from the get_db_objects() method
that contains the elements of the object
Note: This does not work for tables.
Returns the CREATE string
"""
if not self.new_db:
self.new_db = self.db_name
self.q_new_db = self.q_db_name
create_str = None
# Tables are not supported
if obj_type == _TABLE and self.cloning:
return None
# Grants are a different animal!
if obj_type == _GRANT:
if obj[3]:
create_str = "GRANT %s ON %s.%s TO %s" % \
(obj[1], self.q_new_db, obj[3], obj[0])
else:
create_str = "GRANT %s ON %s.* TO %s" % \
(obj[1], self.q_new_db, obj[0])
else:
create_str = self.get_create_statement(self.db_name,
obj[0], obj_type)
if self.new_db != self.db_name:
# Replace the occurrences of the old database name (quoted with
# backticks) with the new one when preceded by: a whitespace
# character, comma or optionally a left parentheses.
create_str = re.sub(
r"(\s|,)(\(?){0}\.".format(self.q_db_name),
r"\1\2{0}.".format(self.q_new_db),
create_str
)
# Replace the occurrences of the old database name (without
# backticks) with the new one when preceded by: a whitespace
# character, comma or optionally a left parentheses and
# surrounded by single or double quotes.
create_str = re.sub(
r"(\s|,)(\(?)(\"|\'?){0}(\"|\'?)\.".format(self.db_name),
r"\1\2\3{0}\4.".format(self.new_db),
create_str
)
return create_str
def __add_db_objects(self, obj_type):
"""Get a list of objects from a database based on type.
This method retrieves the list of objects for a specific object
type and adds it to the class' master object list.
obj_type[in] Object type (string) e.g. DATABASE
"""
rows = self.get_db_objects(obj_type)
if rows:
for row in rows:
tup = (obj_type, row)
self.objects.append(tup)
def init(self):
"""Get all objects for the database based on options set.
This method initializes the database object with a list of all
objects except those object types that are excluded. It calls
the helper method self.__add_db_objects() for each type of
object.
NOTE: This method must be called before the copy method. A
guard is in place to ensure this.
"""
self.init_called = True
# Get tables
if not self.skip_tables:
self.__add_db_objects(_TABLE)
# Get functions
if not self.skip_funcs:
self.__add_db_objects(_FUNC)
# Get stored procedures
if not self.skip_procs:
self.__add_db_objects(_PROC)
# Get views
if not self.skip_views:
self.__add_db_objects(_VIEW)
# Get triggers
if not self.skip_triggers:
self.__add_db_objects(_TRIG)
# Get events
if not self.skip_events:
self.__add_db_objects(_EVENT)
# Get grants
if not self.skip_grants:
self.__add_db_objects(_GRANT)
def __drop_object(self, obj_type, name):
"""Drop a database object.
Attempts a quiet drop of a database object (no errors are
printed).
obj_type[in] Object type (string) e.g. DATABASE
name[in] Name of the object
"""
if self.verbose:
print "# Dropping new object %s %s.%s" % \
(obj_type, self.new_db, name)
drop_str = "DROP %s %s.%s" % \
(obj_type, self.q_new_db, name)
# Suppress the error on drop
if self.cloning:
try:
self.source.exec_query(drop_str, self.query_options)
except UtilError:
if self.verbose:
print("# WARNING: Unable to drop {0} from {1} database "
"(object may not exist): {2}".format(name,
"source",
drop_str))
else:
try:
self.destination.exec_query(drop_str, self.query_options)
except UtilError:
if self.verbose:
print("# WARNING: Unable to drop {0} from {1} database "
"(object may not exist): {2}".format(name,
"destination",
drop_str))
def __create_object(self, obj_type, obj, show_grant_msg,
quiet=False, new_engine=None, def_engine=None):
"""Create a database object.
obj_type[in] Object type (string) e.g. DATABASE
obj[in] A row from the get_db_object_names() method
that contains the elements of the object
show_grant_msg[in] If true, display diagnostic information
quiet[in] do not print informational messages
new_engine[in] Use this engine if not None for object
def_engine[in] If target storage engine doesn't exist, use
this engine.
Note: will handle exception and print error if query fails
"""
if obj_type == _TABLE and self.cloning:
obj_name = quote_with_backticks(obj[0])
create_list = ["CREATE TABLE {0!s}.{1!s} LIKE {2!s}.{1!s}".format(
self.q_new_db, obj_name, self.q_db_name)
]
else:
create_list = [self.__make_create_statement(obj_type, obj)]
if obj_type == _TABLE:
may_skip_fk = False # Check possible issues with FK Constraints
obj_name = quote_with_backticks(obj[0])
tbl_name = "%s.%s" % (self.q_new_db, obj_name)
create_list = self.destination.substitute_engine(tbl_name,
create_list[0],
new_engine,
def_engine,
quiet)
# Get storage engines from the source table and destination table
# If the source table's engine is INNODB and the destination is
# not we will loose any FK constraints that may exist
src_eng = self.get_object_definition(self.q_db_name,
obj[0], obj_type)[0][0][2]
dest_eng = None
# Information about the engine is always in the last statement of
# the list, be it a regular create table statement or a create
# table; alter table statement.
i = create_list[-1].find("ENGINE=")
if i > 0:
j = create_list[-1].find(" ", i)
dest_eng = create_list[-1][i + 7:j]
dest_eng = dest_eng or src_eng
if src_eng.upper() == 'INNODB' and dest_eng.upper() != 'INNODB':
may_skip_fk = True
string = "# Copying"
if not quiet:
if obj_type == _GRANT:
if show_grant_msg:
print "%s GRANTS from %s" % (string, self.db_name)
else:
print "%s %s %s.%s" % \
(string, obj_type, self.db_name, obj[0])
if self.verbose:
print("; ".join(create_list))
try:
self.destination.exec_query("USE %s" % self.q_new_db,
self.query_options)
except:
pass
for stm in create_list:
try:
self.destination.exec_query(stm, self.query_options)
except Exception as e:
raise UtilDBError("Cannot operate on {0} object."
" Error: {1}".format(obj_type, e.errmsg),
-1, self.db_name)
# Look for foreign key constraints
if obj_type == _TABLE:
params = {
'DATABASE': self.db_name,
'TABLE': obj[0],
}
try:
query = _FK_CONSTRAINT_QUERY.format(**params)
fkey_constr = self.source.exec_query(query)
except Exception as e:
raise UtilDBError("Unable to obtain Foreign Key constraint "
"information for table {0}.{1}. "
"Error: {2}".format(self.db_name, obj[0],
e.errmsg), -1,
self.db_name)
# Get information about the foreign keys of the table being
# copied/cloned.
if fkey_constr and not may_skip_fk:
# Create a constraint dictionary with the constraint
# name as key
constr_dict = {}
# This list is used to ensure the same constraints are applied
# in the same order, because iterating the dictionary doesn't
# offer any guarantees regarding order, and Python 2.6 has
# no ordered_dict
constr_lst = []
for fkey in fkey_constr:
params = constr_dict.get(fkey[1])
# in case the constraint entry already exists, it means it
# is composite, just update the columns names and
# referenced column fields
if params:
params['COLUMN_NAMES'].append(fkey[2])
params['REFERENCED_COLUMNS'].append(fkey[5])
else: # else create a new entry
constr_lst.append(fkey[1])
constr_dict[fkey[1]] = {
'DATABASE': self.new_db,
'TABLE': fkey[0],
'CONSTRAINT_NAME': fkey[1],
'COLUMN_NAMES': [fkey[2]],
'REFERENCED_DATABASE': fkey[3],
'REFERENCED_TABLE': fkey[4],
'REFERENCED_COLUMNS': [fkey[5]],
'UPDATE_RULE': fkey[6],
'DELETE_RULE': fkey[7],
}
# Iterate all the constraints and get the necessary parameters
# to create the query
for constr in constr_lst:
params = constr_dict[constr]
if self.cloning: # if it is a cloning table operation
# In case the foreign key is composite we need to join
# the columns to use in in alter table query. Only
# useful when cloning
params['COLUMN_NAMES'] = '`,`'.join(
params['COLUMN_NAMES'])
params['REFERENCED_COLUMNS'] = '`,`'.join(
params['REFERENCED_COLUMNS'])
# If the foreign key points to a table under the
# database being cloned, change the referenced database
# name to the new cloned database
if params['REFERENCED_DATABASE'] == self.db_name:
params['REFERENCED_DATABASE'] = self.new_db
else:
print("# WARNING: The database being cloned has "
"external Foreign Key constraint "
"dependencies, {0}.{1} depends on {2}."
"{3}".format(params['DATABASE'],
params['TABLE'],
params['REFERENCED_DATABASE'],
params['REFERENCED_TABLE'])
)
query = _ALTER_TABLE_ADD_FK_CONSTRAINT.format(**params)
# Store constraint query for later execution
self.constraints.append(query)
if self.verbose:
print(query)
else: # if we are copying
if params['REFERENCED_DATABASE'] != self.db_name:
# if the table being copied has dependencies
# to external databases
print("# WARNING: The database being copied has "
"external Foreign Key constraint "
"dependencies, {0}.{1} depends on {2}."
"{3}".format(params['DATABASE'],
params['TABLE'],
params['REFERENCED_DATABASE'],
params['REFERENCED_TABLE'])
)
elif fkey_constr and may_skip_fk:
print("# WARNING: FOREIGN KEY constraints for table {0}.{1} "
"are missing because the new storage engine for "
"the table is not InnoDB".format(self.new_db, obj[0]))
def __apply_constraints(self):
"""This method applies to the database the constraints stored in the
self.constraints instance variable
"""
# Enable Foreign Key Checks to prevent the swapping of
# RESTRICT referential actions with NO ACTION
query_opts = {'fetch': False, 'commit': False}
self.destination.exec_query("SET FOREIGN_KEY_CHECKS=1", query_opts)
# while constraint queue is not empty
while self.constraints:
try:
query = self.constraints.pop()
except IndexError:
#queue is empty, exit while statement
break
if self.verbose:
print(query)
try:
self.destination.exec_query(query, query_opts)
except Exception as err:
raise UtilDBError("Unable to execute constraint query "
"{0}. Error: {1}".format(query, err.errmsg),
-1, self.new_db)
# Turn Foreign Key Checks off again
self.destination.exec_query("SET FOREIGN_KEY_CHECKS=0", query_opts)
def copy_objects(self, new_db, options, new_server=None,
connections=1, check_exists=True):
"""Copy the database objects.
This method will copy a database and all of its objects and data
to another, new database. Options set at instantiation will determine
if there are objects that are excluded from the copy. Likewise,
the method will also skip data if that option was set and process
an input file with INSERT statements if that option was set.
The method can also be used to copy a database to another server
by providing the new server object (new_server). Copy to the same
name by setting new_db = old_db or as a new database.
new_db[in] Name of the new database
options[in] Options for copy e.g. force, etc.
new_server[in] Connection to another server for copying the db
Default is None (copy to same server - clone)
connections[in] Number of threads(connections) to use for insert
check_exists[in] If True, check for database existence before copy
Default is True
"""
# Must call init() first!
# Guard for init() prerequisite
assert self.init_called, "You must call db.init() before " + \
"db.copy_objects()."
grant_msg_displayed = False
if new_db:
# Assign new database identifier considering backtick quotes.
if is_quoted_with_backticks(new_db):
self.q_new_db = new_db
self.new_db = remove_backtick_quoting(new_db)
else:
self.new_db = new_db
self.q_new_db = quote_with_backticks(new_db)
else:
# If new_db is not defined use the same as source database.
self.new_db = self.db_name
self.q_new_db = self.q_db_name
self.destination = new_server
# We know we're cloning if there is no new connection.
self.cloning = (new_server == self.source)
if self.cloning:
self.destination = self.source
# Check to see if database exists
if check_exists:
if self.cloning:
exists = self.exists(self.source, new_db)
drop_server = self.source
else:
exists = self.exists(self.destination, new_db)
drop_server = self.destination
if exists:
if options.get("force", False):
self.drop(drop_server, True, new_db)
elif not self.skip_create:
raise UtilDBError("destination database exists. Use "
"--force to overwrite existing "
"database.", -1, new_db)
db_name = self.db_name
definition = self.get_object_definition(db_name, db_name, _DATABASE)
_, character_set, collation, _ = definition[0]
# Create new database first
if not self.skip_create:
if self.cloning:
self.create(self.source, new_db, character_set,
collation)
else:
self.create(self.destination, new_db, character_set,
collation)
# Create the objects in the new database
for obj in self.objects:
# Drop object if --force specified and database not dropped
# Grants do not need to be dropped for overwriting
if options.get("force", False) and obj[0] != _GRANT:
obj_name = quote_with_backticks(obj[1][0])
self.__drop_object(obj[0], obj_name)
# Create the object
self.__create_object(obj[0], obj[1], not grant_msg_displayed,
options.get("quiet", False),
options.get("new_engine", None),
options.get("def_engine", None))
if obj[0] == _GRANT and not grant_msg_displayed:
grant_msg_displayed = True
# After object creation, add the constraints
if self.constraints:
self.__apply_constraints()
def copy_data(self, new_db, options, new_server=None, connections=1,
src_con_val=None, dest_con_val=None):
"""Copy the data for the tables.
This method will copy the data for all of the tables to another, new
database. The method will process an input file with INSERT statements
if the option was selected by the caller.
new_db[in] Name of the new database
options[in] Options for copy e.g. force, etc.
new_server[in] Connection to another server for copying the db
Default is None (copy to same server - clone)
connections[in] Number of threads(connections) to use for insert
src_con_val[in] Dict. with the connection values of the source
server (required for multiprocessing).
dest_con_val[in] Dict. with the connection values of the
destination server (required for multiprocessing).
"""
# Must call init() first!
# Guard for init() prerequisite
assert self.init_called, "You must call db.init() before " + \
"db.copy_data()."
if self.skip_data:
return
self.destination = new_server
# We know we're cloning if there is no new connection.
self.cloning = (new_server == self.source)
if self.cloning:
self.destination = self.source
quiet = options.get("quiet", False)
tbl_options = {
'verbose': self.verbose,
'get_cols': True,
'quiet': quiet
}
copy_tbl_tasks = []
table_names = [obj[0] for obj in self.get_db_objects(_TABLE)]
for tblname in table_names:
# Check multiprocess table copy (only on POSIX systems).
if options['multiprocess'] > 1 and os.name == 'posix':
# Create copy task.
copy_task = {
'source_srv': src_con_val,
'dest_srv': dest_con_val,
'source_db': self.db_name,
'target_db': new_db,
'table': tblname,
'options': tbl_options,
'cloning': self.cloning,
}
copy_tbl_tasks.append(copy_task)
else:
# Copy data from a table (no multiprocessing).
_copy_table_data(self.source, self.destination, self.db_name,
new_db, tblname, tbl_options, self.cloning)
# Copy tables concurrently.
if copy_tbl_tasks:
# Create process pool.
workers_pool = multiprocessing.Pool(
processes=options['multiprocess']
)
# Concurrently export tables.
workers_pool.map_async(_multiprocess_tbl_copy_task, copy_tbl_tasks)
workers_pool.close()
# Wait for all task to be completed by workers.
workers_pool.join()
def get_create_statement(self, db, name, obj_type):
"""Return the create statement for the object
db[in] Database name
name[in] Name of the object
obj_type[in] Object type (string) e.g. DATABASE
Note: this is used to form the correct SHOW command
Returns create statement
"""
# Quote database and object name with backticks.
q_name = (name if is_quoted_with_backticks(name)
else quote_with_backticks(name))
if obj_type == _DATABASE:
name_str = q_name
else:
q_db = (db if is_quoted_with_backticks(db)
else quote_with_backticks(db))
# Switch the default database to execute the
# SHOW CREATE statement without needing to specify the database
# This is for 5.1 compatibility reasons:
try:
self.source.exec_query("USE {0}".format(q_db),
self.query_options)
except UtilError as err:
raise UtilDBError("ERROR: Couldn't change "
"default database: {0}".format(err.errmsg))
name_str = q_name
# Retrieve the CREATE statement.
row = self.source.exec_query(
"SHOW CREATE {0} {1}".format(obj_type, name_str)
)
create_statement = None
if row:
if obj_type == _TABLE or obj_type == _VIEW or \
obj_type == _DATABASE:
create_statement = row[0][1]
elif obj_type == _EVENT:
create_statement = row[0][3]
else:
create_statement = row[0][2]
# Remove all table options from the CREATE statement (if requested).
if self.skip_table_opts and obj_type == _TABLE:
# First, get partition options.
create_tbl, sep, part_opts = create_statement.rpartition('\n/*')
# Handle situation where no partition options are found.
if not create_tbl:
create_tbl = part_opts
part_opts = ''
else:
part_opts = "{0}{1}".format(sep, part_opts)
# Then, separate table definitions from table options.
create_tbl, sep, _ = create_tbl.rpartition(') ')
# Reconstruct CREATE statement without table options.
create_statement = "{0}{1}{2}".format(create_tbl, sep, part_opts)
return create_statement
def get_create_table(self, db, table):
"""Return the create table statement for the given table.
This method returns the CREATE TABLE statement for the given table with
or without the table options, according to the Database object
property 'skip_table_opts'.
db[in] Database name.
table[in] Table name.
Returns a tuple with the CREATE TABLE statement and table options
(or None). If skip_table_opts=True the CREATE statement does not
include the table options that are returned separately, otherwise the
table options are included in the CREATE statement and None is returned
as the second tuple element.
"""
# Quote database and table name with backticks.
q_table = (table if is_quoted_with_backticks(table)
else quote_with_backticks(table))
q_db = db if is_quoted_with_backticks(db) else quote_with_backticks(db)
# Retrieve CREATE TABLE.
try:
row = self.source.exec_query(
"SHOW CREATE TABLE {0}.{1}".format(q_db, q_table)
)
create_tbl = row[0][1]
except UtilError as err:
raise UtilDBError("Error retrieving CREATE TABLE for {0}.{1}: "
"{2}".format(q_db, q_table, err.errmsg))
# Separate table options from table definition.
tbl_opts = None
if self.skip_table_opts:
# First, get partition options.
create_tbl, sep, part_opts = create_tbl.rpartition('\n/*')
# Handle situation where no partition options are found.
if not create_tbl:
create_tbl = part_opts
part_opts = ''
else:
part_opts = "{0}{1}".format(sep, part_opts)
# Then, separate table definitions from table options.
create_tbl, sep, tbl_opts = create_tbl.rpartition(') ')
# Reconstruct CREATE TABLE without table options.
create_tbl = "{0}{1}{2}".format(create_tbl, sep, part_opts)
return create_tbl, tbl_opts
def get_table_options(self, db, table):
"""Return the table options.
This method returns the list of used table options (from the CREATE
TABLE statement).
db[in] Database name.
table[in] Table name.
Returns a list of table options.
For example: ['AUTO_INCREMENT=5','ENGINE=InnoDB']
"""
# Quote database and table name with backticks.
q_table = (table if is_quoted_with_backticks(table)
else quote_with_backticks(table))
q_db = db if is_quoted_with_backticks(db) else quote_with_backticks(db)
# Retrieve CREATE TABLE statement.
try:
row = self.source.exec_query(
"SHOW CREATE TABLE {0}.{1}".format(q_db, q_table)
)
create_tbl = row[0][1]
except UtilError as err:
raise UtilDBError("Error retrieving CREATE TABLE for {0}.{1}: "
"{2}".format(q_db, q_table, err.errmsg))
# First, separate partition options.
create_tbl, _, part_opts = create_tbl.rpartition('\n/*')
# Handle situation where no partition options are found.
create_tbl = part_opts if not create_tbl else create_tbl
# Then, separate table options from table definition.
create_tbl, _, tbl_opts = create_tbl.rpartition(') ')
table_options = tbl_opts.split()
return table_options
def get_object_definition(self, db, name, obj_type):
"""Return a list of the object's creation metadata.
This method queries the INFORMATION_SCHEMA or MYSQL database for the
row-based (list) description of the object. This is similar to the
output EXPLAIN <object>.
db[in] Database name
name[in] Name of the object
obj_type[in] Object type (string) e.g. DATABASE
Note: this is used to form the correct SHOW command
Returns list - object definition, None if db.object does not exist
"""
definition = []
from_name = None
condition = None
# Remove objects backticks if needed
db = remove_backtick_quoting(db) \
if is_quoted_with_backticks(db) else db
name = remove_backtick_quoting(name) \
if is_quoted_with_backticks(name) else name
if obj_type == _DATABASE:
columns = 'SCHEMA_NAME, DEFAULT_CHARACTER_SET_NAME, ' + \
'DEFAULT_COLLATION_NAME, SQL_PATH'
from_name = 'SCHEMATA'
condition = "SCHEMA_NAME = '%s'" % name
elif obj_type == _TABLE:
columns = 'TABLE_SCHEMA, TABLE_NAME, ENGINE, AUTO_INCREMENT, ' + \
'AVG_ROW_LENGTH, CHECKSUM, TABLE_COLLATION, ' + \
'TABLE_COMMENT, ROW_FORMAT, CREATE_OPTIONS'
from_name = 'TABLES'
condition = "TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'" % \
(db, name)
elif obj_type == _VIEW:
columns = 'TABLE_SCHEMA, TABLE_NAME, VIEW_DEFINITION, ' + \
'CHECK_OPTION, DEFINER, SECURITY_TYPE'
from_name = 'VIEWS'
condition = "TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'" % \
(db, name)
elif obj_type == _TRIG:
columns = 'TRIGGER_SCHEMA, TRIGGER_NAME, EVENT_MANIPULATION, ' + \
'EVENT_OBJECT_TABLE, ACTION_STATEMENT, ' + \
'ACTION_TIMING, DEFINER'
from_name = 'TRIGGERS'
condition = "TRIGGER_SCHEMA = '%s' AND TRIGGER_NAME = '%s'" % \
(db, name)
elif obj_type == _PROC or obj_type == _FUNC:
columns = 'ROUTINE_SCHEMA, ROUTINE_NAME, ROUTINE_DEFINITION, ' + \
'ROUTINES.SQL_DATA_ACCESS, ROUTINES.SECURITY_TYPE, ' + \
'ROUTINE_COMMENT, ROUTINES.DEFINER, param_list, ' + \
'DTD_IDENTIFIER, ROUTINES.IS_DETERMINISTIC'
from_name = 'ROUTINES JOIN mysql.proc ON ' + \
'ROUTINES.ROUTINE_SCHEMA = proc.db AND ' + \
'ROUTINES.ROUTINE_NAME = proc.name AND ' + \
'ROUTINES.ROUTINE_TYPE = proc.type '
condition = "ROUTINE_SCHEMA = '%s' AND ROUTINE_NAME = '%s'" % \
(db, name)
if obj_type == _PROC:
typ = 'PROCEDURE'
else:
typ = 'FUNCTION'
condition += " AND ROUTINE_TYPE = '%s'" % typ
elif obj_type == _EVENT:
columns = ('EVENT_SCHEMA, EVENT_NAME, DEFINER, EVENT_DEFINITION, '
'EVENT_TYPE, INTERVAL_FIELD, INTERVAL_VALUE, STATUS, '
'ON_COMPLETION, STARTS, ENDS')
from_name = 'EVENTS'
condition = "EVENT_SCHEMA = '%s' AND EVENT_NAME = '%s'" % \
(db, name)
if from_name is None:
raise UtilError('Attempting to get definition from unknown object '
'type = %s.' % obj_type)
values = {
'columns': columns,
'table_name': from_name,
'conditions': condition,
}
rows = self.source.exec_query(_DEFINITION_QUERY % values)
if rows != []:
# If this is a table, we need three types of information:
# basic info, column info, and partitions info
if obj_type == _TABLE:
values['name'] = name
values['db'] = db
basic_def = rows[0]
col_def = self.source.exec_query(_COLUMN_QUERY % values)
part_def = self.source.exec_query(_PARTITION_QUERY % values)
definition.append((basic_def, col_def, part_def))
else:
definition.append(rows[0])
return definition
def get_next_object(self):
"""Retrieve the next object in the database list.
This method is an iterator for retrieving the objects in the database
as specified in the init() method. You must call this method first.
Returns next object in list or throws exception at EOL.
"""
# Must call init() first!
# Guard for init() prerequisite
assert self.init_called, "You must call db.init() before db.copy()."
for obj in self.objects:
yield obj
def __build_exclude_patterns(self, exclude_param):
"""Return a string to add to where clause to exclude objects.
This method will add the conditions to exclude objects based on
name if there is a dot notation or by a search pattern as specified
by the options.
exclude_param[in] Name of column to check.
Returns (string) String to add to where clause or ""
"""
oper = 'NOT REGEXP' if self.use_regexp else 'NOT LIKE'
string = ""
for pattern in self.exclude_patterns:
# Check use of qualified object names (with backtick support).
if pattern.find(".") > 0:
use_backtick = is_quoted_with_backticks(pattern)
db, name = Database.parse_object_name(pattern)
if use_backtick:
# Remove backtick quotes.
db = remove_backtick_quoting(db)
name = remove_backtick_quoting(name)
if db == self.db_name: # Check if database name matches.
value = name # Only use the object name to exclude.
else:
value = pattern
# Otherwise directly use the specified pattern.
else:
value = pattern
if value:
# Append exclude condition to previous one(s).
string = "{0} AND {1} {2} {3}".format(string, exclude_param,
oper, obj2sql(value))
return string
def get_object_type(self, object_name):
"""Return the object type of an object
This method attempts to locate the object name among the objects
in the database. It returns the object type if found or None
if not found.
Note: different types of objects with the same name might exist in the
database.
object_name[in] Name of the object to find
Returns (list of strings) with the object types or None if not found
"""
object_types = None
# Remove object backticks if needed
obj_name = remove_backtick_quoting(object_name) \
if is_quoted_with_backticks(object_name) else object_name
res = self.source.exec_query(_OBJTYPE_QUERY %
{'db_name': self.db_name,
'obj_name': obj_name})
if res:
object_types = ['TABLE' if row[0] == 'BASE TABLE' else row[0]
for row in res]
return object_types
def get_db_objects(self, obj_type, columns='names', get_columns=False,
need_backtick=False):
"""Return a result set containing a list of objects for a given
database based on type.
This method returns either a list of names for the object type
specified, a brief list of minimal columns for creating the
objects, or the full list of columns from INFORMATION_SCHEMA. It can
also provide the list of column names if desired.
obj_type[in] Type of object to retrieve
columns[in] Column mode - names (default), brief, or full
Note: not valid for GRANT objects.
get_columns[in] If True, return column names as first element
and result set as second element. If False,
return only the result set.
need_backtick[in] If True, it returns any identifiers, e.g. table and
column names, quoted with backticks.
By default, False.
TODO: Change implementation to return classes instead of a result set.
Returns mysql.connector result set
"""
exclude_param = ""
if obj_type == _TABLE:
_NAMES = """
SELECT DISTINCT TABLES.TABLE_NAME
"""
names_pos_to_quote = (0,)
_FULL = """
SELECT TABLES.TABLE_CATALOG, TABLES.TABLE_SCHEMA,
TABLES.TABLE_NAME, TABLES.TABLE_TYPE,
TABLES.ENGINE, TABLES.VERSION, TABLES.ROW_FORMAT,
TABLES.TABLE_ROWS, TABLES.AVG_ROW_LENGTH, TABLES.DATA_LENGTH,
TABLES.MAX_DATA_LENGTH, TABLES.INDEX_LENGTH, TABLES.DATA_FREE,
TABLES.AUTO_INCREMENT, TABLES.CREATE_TIME, TABLES.UPDATE_TIME,
TABLES.CHECK_TIME, TABLES.TABLE_COLLATION, TABLES.CHECKSUM,
TABLES.CREATE_OPTIONS, TABLES.TABLE_COMMENT,
COLUMNS.ORDINAL_POSITION, COLUMNS.COLUMN_NAME,
COLUMNS.COLUMN_TYPE, COLUMNS.IS_NULLABLE,
COLUMNS.COLUMN_DEFAULT, COLUMNS.COLUMN_KEY,
REFERENTIAL_CONSTRAINTS.CONSTRAINT_NAME,
REFERENTIAL_CONSTRAINTS.REFERENCED_TABLE_NAME,
REFERENTIAL_CONSTRAINTS.UNIQUE_CONSTRAINT_NAME,
REFERENTIAL_CONSTRAINTS.UNIQUE_CONSTRAINT_SCHEMA,
REFERENTIAL_CONSTRAINTS.UPDATE_RULE,
REFERENTIAL_CONSTRAINTS.DELETE_RULE,
KEY_COLUMN_USAGE.CONSTRAINT_NAME,
KEY_COLUMN_USAGE.COLUMN_NAME AS COL_NAME,
KEY_COLUMN_USAGE.REFERENCED_TABLE_SCHEMA,
KEY_COLUMN_USAGE.REFERENCED_COLUMN_NAME
"""
full_pos_to_quote = (1, 2, 22, 27, 28, 29, 30, 33, 34, 35, 36)
_MINIMAL = """
SELECT TABLES.TABLE_SCHEMA, TABLES.TABLE_NAME, TABLES.ENGINE,
COLUMNS.ORDINAL_POSITION, COLUMNS.COLUMN_NAME,
COLUMNS.COLUMN_TYPE, COLUMNS.IS_NULLABLE,
COLUMNS.COLUMN_DEFAULT, COLUMNS.COLUMN_KEY,
TABLES.TABLE_COLLATION,
TABLES.CREATE_OPTIONS,
REFERENTIAL_CONSTRAINTS.CONSTRAINT_NAME,
REFERENTIAL_CONSTRAINTS.REFERENCED_TABLE_NAME,
REFERENTIAL_CONSTRAINTS.UNIQUE_CONSTRAINT_NAME,
REFERENTIAL_CONSTRAINTS.UPDATE_RULE,
REFERENTIAL_CONSTRAINTS.DELETE_RULE,
KEY_COLUMN_USAGE.CONSTRAINT_NAME,
KEY_COLUMN_USAGE.COLUMN_NAME AS COL_NAME,
KEY_COLUMN_USAGE.REFERENCED_TABLE_SCHEMA,
KEY_COLUMN_USAGE.REFERENCED_COLUMN_NAME
"""
minimal_pos_to_quote = (0, 1, 4, 11, 12, 13, 16, 17, 18, 19)
_OBJECT_QUERY = """
FROM INFORMATION_SCHEMA.TABLES JOIN INFORMATION_SCHEMA.COLUMNS ON
TABLES.TABLE_SCHEMA = COLUMNS.TABLE_SCHEMA AND
TABLES.TABLE_NAME = COLUMNS.TABLE_NAME
LEFT JOIN INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ON
TABLES.TABLE_SCHEMA = REFERENTIAL_CONSTRAINTS.CONSTRAINT_SCHEMA
AND
TABLES.TABLE_NAME = REFERENTIAL_CONSTRAINTS.TABLE_NAME
LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE ON
TABLES.TABLE_SCHEMA = KEY_COLUMN_USAGE.CONSTRAINT_SCHEMA
AND
TABLES.TABLE_NAME = KEY_COLUMN_USAGE.TABLE_NAME
WHERE TABLES.TABLE_SCHEMA = '%s' AND TABLE_TYPE <> 'VIEW' %s
GROUP BY TABLES.TABLE_SCHEMA, TABLES.TABLE_NAME,
COLUMNS.ORDINAL_POSITION
ORDER BY TABLES.TABLE_SCHEMA, TABLES.TABLE_NAME,
COLUMNS.ORDINAL_POSITION
"""
exclude_param = "TABLES.TABLE_NAME"
elif obj_type == _VIEW:
_NAMES = """
SELECT TABLE_NAME
"""
names_pos_to_quote = (0,)
_FULL = """
SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, VIEW_DEFINITION,
CHECK_OPTION, IS_UPDATABLE, DEFINER, SECURITY_TYPE,
CHARACTER_SET_CLIENT, COLLATION_CONNECTION
"""
full_pos_to_quote = (1, 2)
_MINIMAL = """
SELECT TABLE_SCHEMA, TABLE_NAME, DEFINER, SECURITY_TYPE,
VIEW_DEFINITION, CHECK_OPTION, IS_UPDATABLE,
CHARACTER_SET_CLIENT, COLLATION_CONNECTION
"""
minimal_pos_to_quote = (0, 1)
_OBJECT_QUERY = """
FROM INFORMATION_SCHEMA.VIEWS
WHERE TABLE_SCHEMA = '%s' %s
"""
exclude_param = "VIEWS.TABLE_NAME"
elif obj_type == _TRIG:
_NAMES = """
SELECT TRIGGER_NAME
"""
names_pos_to_quote = (0,)
_FULL = """
SELECT TRIGGER_CATALOG, TRIGGER_SCHEMA, TRIGGER_NAME,
EVENT_MANIPULATION, EVENT_OBJECT_CATALOG,
EVENT_OBJECT_SCHEMA, EVENT_OBJECT_TABLE, ACTION_ORDER,
ACTION_CONDITION, ACTION_STATEMENT, ACTION_ORIENTATION,
ACTION_TIMING, ACTION_REFERENCE_OLD_TABLE,
ACTION_REFERENCE_NEW_TABLE, ACTION_REFERENCE_OLD_ROW,
ACTION_REFERENCE_NEW_ROW, CREATED, SQL_MODE, DEFINER,
CHARACTER_SET_CLIENT, COLLATION_CONNECTION,
DATABASE_COLLATION
"""
full_pos_to_quote = (1, 2, 5, 6) # 9 ?
_MINIMAL = """
SELECT TRIGGER_NAME, DEFINER, EVENT_MANIPULATION,
EVENT_OBJECT_SCHEMA, EVENT_OBJECT_TABLE,
ACTION_ORIENTATION, ACTION_TIMING,
ACTION_STATEMENT, SQL_MODE,
CHARACTER_SET_CLIENT, COLLATION_CONNECTION,
DATABASE_COLLATION
"""
# Note: 7 (ACTION_STATEMENT) might require special handling
minimal_pos_to_quote = (0, 3, 4)
_OBJECT_QUERY = """
FROM INFORMATION_SCHEMA.TRIGGERS
WHERE TRIGGER_SCHEMA = '%s' %s
"""
exclude_param = "TRIGGERS.TRIGGER_NAME"
elif obj_type == _PROC:
_NAMES = """
SELECT NAME
"""
names_pos_to_quote = (0,)
_FULL = """
SELECT DB, NAME, TYPE, SPECIFIC_NAME, LANGUAGE, SQL_DATA_ACCESS,
IS_DETERMINISTIC, SECURITY_TYPE, PARAM_LIST, RETURNS, BODY,
DEFINER, CREATED, MODIFIED, SQL_MODE, COMMENT,
CHARACTER_SET_CLIENT, COLLATION_CONNECTION, DB_COLLATION,
BODY_UTF8
"""
full_pos_to_quote = (0, 1, 3)
_MINIMAL = """
SELECT NAME, LANGUAGE, SQL_DATA_ACCESS, IS_DETERMINISTIC,
SECURITY_TYPE, DEFINER, PARAM_LIST, RETURNS,
BODY, SQL_MODE,
CHARACTER_SET_CLIENT, COLLATION_CONNECTION,
DB_COLLATION
"""
minimal_pos_to_quote = (0,)
_OBJECT_QUERY = """
FROM mysql.proc
WHERE DB = '%s' AND TYPE = 'PROCEDURE' %s
"""
exclude_param = "NAME"
elif obj_type == _FUNC:
_NAMES = """
SELECT NAME
"""
names_pos_to_quote = (0,)
_FULL = """
SELECT DB, NAME, TYPE, SPECIFIC_NAME, LANGUAGE, SQL_DATA_ACCESS,
IS_DETERMINISTIC, SECURITY_TYPE, PARAM_LIST, RETURNS, BODY,
DEFINER, CREATED, MODIFIED, SQL_MODE, COMMENT,
CHARACTER_SET_CLIENT, COLLATION_CONNECTION, DB_COLLATION,
BODY_UTF8
"""
full_pos_to_quote = (0, 1, 3)
_MINIMAL = """
SELECT NAME, LANGUAGE, SQL_DATA_ACCESS, IS_DETERMINISTIC,
SECURITY_TYPE, DEFINER, PARAM_LIST, RETURNS,
BODY, SQL_MODE,
CHARACTER_SET_CLIENT, COLLATION_CONNECTION,
DB_COLLATION
"""
minimal_pos_to_quote = (0,)
_OBJECT_QUERY = """
FROM mysql.proc
WHERE DB = '%s' AND TYPE = 'FUNCTION' %s
"""
exclude_param = "NAME"
elif obj_type == _EVENT:
_NAMES = """
SELECT NAME
"""
names_pos_to_quote = (0,)
_FULL = """
SELECT DB, NAME, BODY, DEFINER, EXECUTE_AT, INTERVAL_VALUE,
INTERVAL_FIELD, CREATED, MODIFIED, LAST_EXECUTED, STARTS,
ENDS, STATUS, ON_COMPLETION, SQL_MODE, COMMENT, ORIGINATOR,
TIME_ZONE, CHARACTER_SET_CLIENT, COLLATION_CONNECTION,
DB_COLLATION, BODY_UTF8
"""
full_pos_to_quote = (0, 1)
_MINIMAL = """
SELECT NAME, DEFINER, BODY, STATUS,
EXECUTE_AT, INTERVAL_VALUE, INTERVAL_FIELD, SQL_MODE,
STARTS, ENDS, STATUS, ON_COMPLETION, ORIGINATOR,
CHARACTER_SET_CLIENT, COLLATION_CONNECTION,
DB_COLLATION
"""
minimal_pos_to_quote = (0,)
_OBJECT_QUERY = """
FROM mysql.event
WHERE DB = '%s' %s
"""
exclude_param = "NAME"
elif obj_type == _GRANT:
_OBJECT_QUERY = """
(
SELECT GRANTEE, PRIVILEGE_TYPE, TABLE_SCHEMA,
NULL as TABLE_NAME, NULL AS COLUMN_NAME,
NULL AS ROUTINE_NAME
FROM INFORMATION_SCHEMA.SCHEMA_PRIVILEGES
WHERE table_schema = '%s'
) UNION (
SELECT grantee, privilege_type, table_schema, table_name,
NULL, NULL
FROM INFORMATION_SCHEMA.TABLE_PRIVILEGES
WHERE table_schema = '%s'
) UNION (
SELECT grantee, privilege_type, table_schema, table_name,
column_name, NULL
FROM INFORMATION_SCHEMA.COLUMN_PRIVILEGES
WHERE table_schema = '%s'
) UNION (
SELECT CONCAT('''', User, '''@''', Host, ''''), Proc_priv, Db,
Routine_name, NULL, Routine_type
FROM mysql.procs_priv WHERE Db = '%s'
) ORDER BY GRANTEE ASC, PRIVILEGE_TYPE ASC, TABLE_SCHEMA ASC,
TABLE_NAME ASC, COLUMN_NAME ASC, ROUTINE_NAME ASC
"""
else:
return None
col_options = {
'columns': get_columns
}
pos_to_quote = ()
if obj_type == _GRANT:
query = _OBJECT_QUERY % (self.db_name, self.db_name,
self.db_name, self.db_name)
return self.source.exec_query(query, col_options)
else:
if columns == "names":
prefix = _NAMES
if need_backtick:
pos_to_quote = names_pos_to_quote
elif columns == "full":
prefix = _FULL
if need_backtick:
pos_to_quote = full_pos_to_quote
else:
prefix = _MINIMAL
if need_backtick:
pos_to_quote = minimal_pos_to_quote
# Form exclusion string
exclude_str = ""
if self.exclude_patterns:
exclude_str = self.__build_exclude_patterns(exclude_param)
query = prefix + _OBJECT_QUERY % (self.db_name, exclude_str)
res = self.source.exec_query(query, col_options)
# Quote required identifiers with backticks
if need_backtick:
# function to quote row elements at a given positions
# quote = lambda pos, obj: quote_with_backticks(obj) \
# if obj and pos in pos_to_quote else obj
new_rows = []
for row in res[1]:
# recreate row tuple quoting needed elements with backticks
#r = tuple([quote(i, data) for i, data in enumerate(row)])
r = tuple([quote_with_backticks(data)
if data and i in pos_to_quote else data
for i, data in enumerate(row)])
new_rows.append(r)
# set new result with with required data quoted with backticks
res = (res[0], new_rows)
return res
def _check_user_permissions(self, uname, host, access):
"""Check user permissions for a given privilege
uname[in] user name to check
host[in] host name of connection
acess[in] privilege to check (e.g. "SELECT")
Returns True if user has permission, False if not
"""
user = User(self.source, uname + '@' + host)
result = user.has_privilege(access[0], '*', access[1])
return result
def check_read_access(self, user, host, options):
"""Check access levels for reading database objects
This method will check the user's permission levels for copying a
database from this server.
It will also skip specific checks if certain objects are not being
copied (i.e., views, procs, funcs, grants).
user[in] user name to check
host[in] host name to check
options[in] dictionary of values to include:
skip_views True = no views processed
skip_proc True = no procedures processed
skip_func True = no functions processed
skip_grants True = no grants processed
skip_events True = no events processed
Returns True if user has permissions and raises a UtilDBError if the
user does not have permission with a message that includes
the server context.
"""
# Build minimal list of privileges for source access
source_privs = []
priv_tuple = (self.db_name, "SELECT")
source_privs.append(priv_tuple)
# if views are included, we need SHOW VIEW
if not options.get('skip_views', False):
priv_tuple = (self.db_name, "SHOW VIEW")
source_privs.append(priv_tuple)
# if procs or funcs are included, we need read on mysql db
if not options.get('skip_proc', False) or \
not options.get('skip_func', False):
priv_tuple = ("mysql", "SELECT")
source_privs.append(priv_tuple)
# if events, we need event
if not options.get('skip_events', False):
priv_tuple = (self.db_name, "EVENT")
source_privs.append(priv_tuple)
# Check permissions on source
for priv in source_privs:
if not self._check_user_permissions(user, host, priv):
raise UtilDBError("User %s on the %s server does not have "
"permissions to read all objects in %s. " %
(user, self.source.role, self.db_name) +
"User needs %s privilege on %s." %
(priv[1], priv[0]), -1, priv[0])
return True
def check_write_access(self, user, host, options):
"""Check access levels for creating and writing database objects
This method will check the user's permission levels for copying a
database to this server.
It will also skip specific checks if certain objects are not being
copied (i.e., views, procs, funcs, grants).
user[in] user name to check
host[in] host name to check
options[in] dictionary of values to include:
skip_views True = no views processed
skip_proc True = no procedures processed
skip_func True = no functions processed
skip_grants True = no grants processed
skip_events True = no events processed
Returns True if user has permissions and raises a UtilDBError if the
user does not have permission with a message that includes
the server context.
"""
dest_privs = [(self.db_name, "CREATE"),
(self.db_name, "SUPER"),
("*", "SUPER")]
if not options.get('skip_grants', False):
priv_tuple = (self.db_name, "WITH GRANT OPTION")
dest_privs.append(priv_tuple)
# Check privileges on destination
for priv in dest_privs:
if not self._check_user_permissions(user, host, priv):
raise UtilDBError("User %s on the %s server does not "
"have permissions to create all objects "
"in %s. User needs %s privilege on %s." %
(user, self.source.role, priv[0], priv[1],
priv[0]), -1, priv[0])
return True
@staticmethod
def parse_object_name(qualified_name):
"""Parse db, name from db.name
qualified_name[in] MySQL object string (e.g. db.table)
Returns tuple containing name split
"""
# Split the qualified name considering backtick quotes
parts = re.match(REGEXP_QUALIFIED_OBJ_NAME, qualified_name)
if parts:
return parts.groups()
else:
return (None, None)
#
# Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
"""
This file contains a module to read .frm files and attempt to create a
facsimile of the CREATE TABLE command.
"""
import bisect
import os
import stat
import struct
import time
from mysql.utilities.common.charsets import CharsetInfo
from mysql.utilities.exception import UtilError
from pprint import pprint
#
# Definitions and types for interpreting the .frm file values.
#
# Misc. constants
_PORTABLE_SIZEOF_CHAR_PTR = 8
_MY_CHARSET_BIN_NUM = 63
_HA_NOSAME = 1
_DIG2BYTES = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4]
_DIG_PER_DEC1 = 9
_HEADER_LEN = 64
_TABLE_TYPE = 0x01fe # Magic number for table .frm files
_VIEW_TYPE = 0x5954 # Magic number for view .frm files
_FIELD_NR_MASK = 16383
_HA_USES_COMMENT = 4096
# MySQL data type definitions
_MYSQL_TYPE_DECIMAL = 0
_MYSQL_TYPE_TINY = 1
_MYSQL_TYPE_SHORT = 2
_MYSQL_TYPE_LONG = 3
_MYSQL_TYPE_FLOAT = 4
_MYSQL_TYPE_DOUBLE = 5
_MYSQL_TYPE_NULL = 6
_MYSQL_TYPE_TIMESTAMP = 7
_MYSQL_TYPE_LONGLONG = 8
_MYSQL_TYPE_INT24 = 9
_MYSQL_TYPE_DATE = 10
_MYSQL_TYPE_TIME = 11
_MYSQL_TYPE_DATETIME = 12
_MYSQL_TYPE_YEAR = 13
_MYSQL_TYPE_NEWDATE = 14
_MYSQL_TYPE_VARCHAR = 15
_MYSQL_TYPE_BIT = 16
_MYSQL_TYPE_TIMESTAMP2 = 17
_MYSQL_TYPE_DATETIME2 = 18
_MYSQL_TYPE_TIME2 = 19
_MYSQL_TYPE_NEWDECIMAL = 246
_MYSQL_TYPE_ENUM = 247
_MYSQL_TYPE_SET = 248
_MYSQL_TYPE_TINY_BLOB = 249
_MYSQL_TYPE_MEDIUM_BLOB = 250
_MYSQL_TYPE_LONG_BLOB = 251
_MYSQL_TYPE_BLOB = 252
_MYSQL_TYPE_VAR_STRING = 253
_MYSQL_TYPE_STRING = 254
_MYSQL_TYPE_GEOMETRY = 255
# Mapping of field data types to data type names
_col_types = [
{'value': _MYSQL_TYPE_DECIMAL, 'text': 'decimal', 'size': None},
{'value': _MYSQL_TYPE_TINY, 'text': 'tinyint', 'size': 1},
{'value': _MYSQL_TYPE_SHORT, 'text': 'smallint', 'size': 2},
{'value': _MYSQL_TYPE_LONG, 'text': 'int', 'size': 4},
{'value': _MYSQL_TYPE_FLOAT, 'text': 'float', 'size': 4},
{'value': _MYSQL_TYPE_DOUBLE, 'text': 'double', 'size': 8},
{'value': _MYSQL_TYPE_NULL, 'text': 'NULL', 'size': 0},
{'value': _MYSQL_TYPE_TIMESTAMP, 'text': 'timestamp', 'size': 4},
{'value': _MYSQL_TYPE_LONGLONG, 'text': 'bigint', 'size': 8},
{'value': _MYSQL_TYPE_INT24, 'text': 'mediumint', 'size': 3},
{'value': _MYSQL_TYPE_DATE, 'text': 'date', 'size': 4},
{'value': _MYSQL_TYPE_TIME, 'text': 'time', 'size': 3},
{'value': _MYSQL_TYPE_DATETIME, 'text': 'datetime', 'size': 8},
{'value': _MYSQL_TYPE_YEAR, 'text': 'year', 'size': 1},
{'value': _MYSQL_TYPE_NEWDATE, 'text': 'date', 'size': 3},
# Size must be calculated
{'value': _MYSQL_TYPE_VARCHAR, 'text': 'varchar', 'size': -1},
# Size must be calculated
{'value': _MYSQL_TYPE_BIT, 'text': 'bit', 'size': -2},
{'value': _MYSQL_TYPE_TIMESTAMP2, 'text': 'timestamp', 'size': 4},
{'value': _MYSQL_TYPE_DATETIME2, 'text': 'datetime', 'size': 8},
{'value': _MYSQL_TYPE_TIME2, 'text': 'time', 'size': 3},
{'value': _MYSQL_TYPE_NEWDECIMAL, 'text': 'decimal', 'size': None},
{'value': _MYSQL_TYPE_ENUM, 'text': 'enum', 'size': 0},
{'value': _MYSQL_TYPE_SET, 'text': 'set', 'size': 0},
{'value': _MYSQL_TYPE_TINY_BLOB, 'text': 'tinyblob',
'size': 1 + _PORTABLE_SIZEOF_CHAR_PTR},
{'value': _MYSQL_TYPE_MEDIUM_BLOB, 'text': 'mediumblob',
'size': 3 + _PORTABLE_SIZEOF_CHAR_PTR},
{'value': _MYSQL_TYPE_LONG_BLOB, 'text': 'longblob',
'size': 4 + _PORTABLE_SIZEOF_CHAR_PTR},
{'value': _MYSQL_TYPE_BLOB, 'text': 'blob',
'size': 2 + _PORTABLE_SIZEOF_CHAR_PTR},
# Size must be calculated
{'value': _MYSQL_TYPE_VAR_STRING, 'text': 'varchar', 'size': -1},
{'value': _MYSQL_TYPE_STRING, 'text': 'char', 'size': None},
{'value': _MYSQL_TYPE_GEOMETRY, 'text': 'geometry',
'size': 4 + _PORTABLE_SIZEOF_CHAR_PTR},
]
_col_keys = [item['value'] for item in _col_types]
# Database/engine type definitions
_DB_TYPE_UNKNOWN = 0
_DB_TYPE_DIAB_ISAM = 1
_DB_TYPE_HASH = 2
_DB_TYPE_MISAM = 3
_DB_TYPE_PISAM = 4
_DB_TYPE_RMS_ISAM = 5
_DB_TYPE_HEAP = 6
_DB_TYPE_ISAM = 7
_DB_TYPE_MRG_ISAM = 8
_DB_TYPE_MYISAM = 9
_DB_TYPE_MRG_MYISAM = 10
_DB_TYPE_BERKELEY_DB = 11
_DB_TYPE_INNODB = 12
_DB_TYPE_GEMINI = 13
_DB_TYPE_NDBCLUSTER = 14
_DB_TYPE_EXAMPLE_DB = 15
_DB_TYPE_ARCHIVE_DB = 16
_DB_TYPE_CSV_DB = 17
_DB_TYPE_FEDERATED_DB = 18
_DB_TYPE_BLACKHOLE_DB = 19
_DB_TYPE_PARTITION_DB = 20
_DB_TYPE_BINLOG = 21
_DB_TYPE_SOLID = 22
_DB_TYPE_PBXT = 23
_DB_TYPE_TABLE_FUNCTION = 24
_DB_TYPE_MEMCACHE = 25
_DB_TYPE_FALCON = 26
_DB_TYPE_MARIA = 27
_DB_TYPE_PERFORMANCE_SCHEMA = 28
_DB_TYPE_FIRST_DYNAMIC = 42
_DB_TYPE_DEFAULT = 127
# Mapping of engine types to engine names
_engine_types = [
{'value': _DB_TYPE_UNKNOWN, 'text': 'UNKNOWN'},
{'value': _DB_TYPE_DIAB_ISAM, 'text': 'ISAM'},
{'value': _DB_TYPE_HASH, 'text': 'HASH'},
{'value': _DB_TYPE_MISAM, 'text': 'MISAM'},
{'value': _DB_TYPE_PISAM, 'text': 'PISAM'},
{'value': _DB_TYPE_RMS_ISAM, 'text': 'RMS_ISAM'},
{'value': _DB_TYPE_HEAP, 'text': 'HEAP'},
{'value': _DB_TYPE_ISAM, 'text': 'ISAM'},
{'value': _DB_TYPE_MRG_ISAM, 'text': 'MERGE'},
{'value': _DB_TYPE_MYISAM, 'text': 'MYISAM'},
{'value': _DB_TYPE_MRG_MYISAM, 'text': 'MERGE'},
{'value': _DB_TYPE_BERKELEY_DB, 'text': 'BDB'},
{'value': _DB_TYPE_INNODB, 'text': 'INNODB'},
{'value': _DB_TYPE_GEMINI, 'text': 'GEMINI'},
{'value': _DB_TYPE_NDBCLUSTER, 'text': 'NDBCLUSTER'},
{'value': _DB_TYPE_EXAMPLE_DB, 'text': 'EXAMPLE'},
{'value': _DB_TYPE_ARCHIVE_DB, 'text': 'ARCHIVE'},
{'value': _DB_TYPE_CSV_DB, 'text': 'CSV'},
{'value': _DB_TYPE_FEDERATED_DB, 'text': 'FEDERATED'},
{'value': _DB_TYPE_BLACKHOLE_DB, 'text': 'BLACKHOLE'},
{'value': _DB_TYPE_PARTITION_DB, 'text': 'PARTITION'},
{'value': _DB_TYPE_BINLOG, 'text': 'BINLOG'},
{'value': _DB_TYPE_SOLID, 'text': 'SOLID'},
{'value': _DB_TYPE_PBXT, 'text': 'PBXT'},
{'value': _DB_TYPE_TABLE_FUNCTION, 'text': 'FUNCTION'},
{'value': _DB_TYPE_MEMCACHE, 'text': 'MEMCACHE'},
{'value': _DB_TYPE_FALCON, 'text': 'FALCON'},
{'value': _DB_TYPE_MARIA, 'text': 'MARIA'},
{'value': _DB_TYPE_PERFORMANCE_SCHEMA, 'text': 'PERFORMANCE_SCHEMA'},
{'value': _DB_TYPE_FIRST_DYNAMIC, 'text': 'DYNAMIC'},
{'value': _DB_TYPE_DEFAULT, 'text': 'DEFAULT'},
]
_engine_keys = [item['value'] for item in _engine_types]
# Key algorithms
_KEY_ALG = ['UNDEFINED', 'BTREE', 'RTREE', 'HASH', 'FULLTEXT']
# Format definitions
# 1 2 3
# 01234567890123456789012345678901
_HEADER_FORMAT = "<BBBBHHIHHIHHHHHBBIBBBBBIIIIBBBHH"
# 11122222333333444445556666
# 12346824602468023489012371590124
# *** 111111
# 0123456789012345
_COL_DATA = "<BBBBBBBBBBBBBBBH"
# 0123456789111111
# 012345
# Various flags copied from server source code - some may not be used but
# may find a use as more esoteric table configurations are tested. These
# are derived from fields.h and all may not apply but are included for
# future expansion/features.
_FIELDFLAG_DECIMAL = 1
_FIELDFLAG_BINARY = 1
_FIELDFLAG_NUMBER = 2
_FIELDFLAG_ZEROFILL = 4
_FIELDFLAG_PACK = 120 # Bits used for packing
_FIELDFLAG_INTERVAL = 256 # mangled with decimals!
_FIELDFLAG_BITFIELD = 512 # mangled with decimals!
_FIELDFLAG_BLOB = 1024 # mangled with decimals!
_FIELDFLAG_GEOM = 2048 # mangled with decimals!
_FIELDFLAG_TREAT_BIT_AS_CHAR = 4096 # use Field_bit_as_char
_FIELDFLAG_LEFT_FULLSCREEN = 8192
_FIELDFLAG_RIGHT_FULLSCREEN = 16384
_FIELDFLAG_FORMAT_NUMBER = 16384 # predit: ###,,## in output
_FIELDFLAG_NO_DEFAULT = 16384 # sql
_FIELDFLAG_SUM = 32768 # predit: +#fieldflag
_FIELDFLAG_MAYBE_NULL = 32768 # sql
_FIELDFLAG_HEX_ESCAPE = 0x10000
_FIELDFLAG_PACK_SHIFT = 3
_FIELDFLAG_DEC_SHIFT = 8
_FIELDFLAG_MAX_DEC = 31
_FIELDFLAG_NUM_SCREEN_TYPE = 0x7F01
_FIELDFLAG_ALFA_SCREEN_TYPE = 0x7800
# Additional flags
_NOT_NULL_FLAG = 1 # Field can't be NULL
_PRI_KEY_FLAG = 2 # Field is part of a primary key
_UNIQUE_KEY_FLAG = 4 # Field is part of a unique key
_MULTIPLE_KEY_FLAG = 8 # Field is part of a key
_BLOB_FLAG = 16 # Field is a blob
_UNSIGNED_FLAG = 32 # Field is unsigned
_HA_PACK_RECORD = 1 # Pack record?
_HA_FULLTEXT = 128 # For full-text search
_HA_SPATIAL = 1024 # For spatial search
# Row type definitions
_ROW_TYPE_DEFAULT, _ROW_TYPE_FIXED, _ROW_TYPE_DYNAMIC, _ROW_TYPE_COMPRESSED, \
_ROW_TYPE_REDUNDANT, _ROW_TYPE_COMPACT, _ROW_TYPE_PAGE = range(0, 7)
# enum utypes from field.h
_NONE, _DATE, _SHIELD, _NOEMPTY, _CASEUP, _PNR, _BGNR, _PGNR, _YES, _NO, \
_REL, _CHECK, _EMPTY, _UNKNOWN_FIELD, _CASEDN, _NEXT_NUMBER, \
_INTERVAL_FIELD, _BIT_FIELD, _TIMESTAMP_OLD_FIELD, _CAPITALIZE, \
_BLOB_FIELD, _TIMESTAMP_DN_FIELD, _TIMESTAMP_UN_FIELD, \
_TIMESTAMP_DNUN_FIELD = range(0, 24)
# Array of field data types that can be unsigned
_UNSIGNED_FIELDS = ['TINYINT', 'SMALLINT', 'MEDIUMINT', 'INT', 'INTEGER',
'BIGINT', 'REAL', 'DOUBLE', 'FLOAT', 'DECIMAL', 'NUMERIC']
# Array of field data types that can have character set options
_CS_ENABLED = ['CHAR', 'VARCHAR', 'TINYBLOB', 'BLOB', 'MEDIUMBLOB', 'LONGBLOB',
'ENUM', 'SET']
# Array of index (key) types
_KEY_TYPES = ['PRIMARY', 'UNIQUE', 'MULTIPLE', 'FULLTEXT', 'SPATIAL',
'FOREIGN_KEY']
# Array of field data types that do not require parens for size
_NO_PARENS = ['TIMESTAMP', 'DATETIME', 'YEAR', 'DATE', 'TIME',
'TINYBLOB', 'BLOB', 'MEDIUMBLOB', 'LONGBLOB',
'TINYTEXT', 'TEXT', 'MEDIUMTEXT', 'LONGTEXT']
# Array of field data types that are real data
_REAL_TYPES = ['REAL', 'DOUBLE', 'FLOAT', 'DECIMAL', 'NUMERIC']
# Array of blob data types
_BLOB_TYPES = [_MYSQL_TYPE_TINY_BLOB, _MYSQL_TYPE_MEDIUM_BLOB,
_MYSQL_TYPE_LONG_BLOB, _MYSQL_TYPE_BLOB,
_MYSQL_TYPE_GEOMETRY]
# Array of data types that do not use keysize for indexes
_NO_KEYSIZE = ['BIT', 'ENUM', 'SET', 'DECIMAL', 'NUMERIC',
'TIMESTAMP', 'TIME', 'DATETIME']
def _is_decimal(col):
"""Check for decimal data types
Returns bool - True if column is decimal or numeric.
"""
return col['field_type_name'].upper() in ['DECIMAL', 'NUMERIC']
def _is_cs_enabled(col):
"""Check for data types that accept character set option
Returns bool - True if column supports character set option.
"""
return col['field_type_name'].upper() in _CS_ENABLED
def _is_unsigned(col):
"""Check for unsigned data types
Returns bool - True if column is an unsigned type.
"""
return col['field_type_name'].upper() in _UNSIGNED_FIELDS
def _is_real(col):
"""Check for real data types
Returns bool - True if column is a real type.
"""
return col['field_type_name'].upper() in _REAL_TYPES
def _is_no_parens(col):
"""Check for column uses parens for size
Returns bool - True if column needs parens for size.
"""
return col['field_type_name'].upper() in _NO_PARENS
def _is_blob(col):
"""Check for blob data types
Returns bool - True if column is a blob.
"""
return col['field_type'] in _BLOB_TYPES
def _is_geometry(flags):
"""Check for geometry field types
Returns bool - True if geometry type.
"""
print "flags: %0x" % flags
return (flags & _FIELDFLAG_GEOM) == _FIELDFLAG_GEOM
def _no_keysize(col):
"""Check for data types that do not use keysize
Returns bool - True if column is to be exluded from keysize.
"""
return col['field_type_name'].upper() in _NO_KEYSIZE
def _print_default_values(values):
"""Print default values
The method prints the default values 2 bytes at a time in hexidecimal
and ASCII representation (similar to hexdump).
values[in] Array of default values
"""
num_bytes = len(values)
print "# Default values raw data:"
i = 0
while (i < num_bytes):
def_str = ""
j = 0
print "#",
while (j < 8) and (i < num_bytes):
print "%02x" % ord(values[i]),
def_str += values[i]
i += 1
j += 1
print "",
j = 0
while (j < 8) and (i < num_bytes):
print "%02x" % ord(values[i]),
def_str += values[i]
i += 1
j += 1
print " |",
print def_str
def _get_pack_length(col):
"""Find the pack length for the field
col[in] Column data read for the column to operate
Returns tuple - (pack_length, field_size)
"""
size = _col_types[bisect.bisect_left(_col_keys,
col['field_type'])]['size']
if size == -1:
col_len = col['bytes_in_col']
return (1 if int(col_len) < 256 else 2), col_len
if size == -2:
col_len = col['bytes_in_col']
return col_len / 8, col_len
if size is None:
return size, col['bytes_in_col'] # It's a string of some sort
return 0, size
def _get_blob_text(col):
"""Form the correct field name string for blobs and text fields
col[in] Column data read for the column to operate
Returns string - field name string
"""
type_str = ""
if col['field_type'] == _MYSQL_TYPE_TINY_BLOB:
type_str = "tiny"
elif col['field_type'] == _MYSQL_TYPE_MEDIUM_BLOB:
type_str = "medium"
elif col['field_type'] == _MYSQL_TYPE_LONG_BLOB:
type_str = "long"
if col['charset'] == _MY_CHARSET_BIN_NUM:
type_str = "".join([type_str, "blob"])
else:
type_str = "".join([type_str, "text"])
return type_str
def _format_default(col, col_flags, length, decimals):
"""Format a defaut value for printing
col[in] Column data dictionary
col_flags[in] Flags for column
length[in] Length of default value or integer part for floats
decimals[in] Number of decimal positions for floats
Returns string - default clause for CREATE statement.
"""
default = col['default']
if isinstance(default, str):
fmt_str = "'%s'"
# Check for zerofill:
elif col_flags & _FIELDFLAG_ZEROFILL:
if _is_real(col):
if decimals > 0 and decimals < length:
if col['field_type_name'].upper() == "DECIMAL":
length += 1
fmt_str = "'" + '%0' + "%s" % length + '.' + \
"%s" % decimals + 'f' + "'"
else:
fmt_str = "'" + '%0' + "%s" % length + '.' + 'f' + "'"
if float(default) == 0.0:
fmt_str = "%s"
default = "NULL"
else:
fmt_str = "'" + '%0' + "%s" % length + 'd' + "'"
else:
if _is_real(col):
if decimals > 0 and decimals < length:
fmt_str = "'" + '%' + "%s" % (length - 1) + '.' + \
"%s" % decimals + 'f' + "'"
elif decimals == 0:
fmt_str = "'%d'"
default = divmod(default, 1)[0]
else:
i, decm = divmod(default, 1)
if decm == 0:
fmt_str = "'%d'"
default = i
else:
fmt_str = "'%f'"
if float(default) == 0.0:
fmt_str = "%s"
default = "NULL"
else:
fmt_str = "'%d'"
return " DEFAULT " + fmt_str % default
class FrmReader(object):
"""
This class implements an abstract of the .frm file format. It can be used
to produce a likeness of the CREATE TABLE command. It is not a 100% match
because some of the components are missing from the .frm file. For
example, there are no character set or collation definitions stored so
unless one has access to the server definitions, these cannot be
determined.
The class permits the following operations:
- show_create_table_statement() - read a .frm file and print its CREATE
statement. Optionally displays statistics for the .frm file.
"""
def __init__(self, db_name, table, frm_path, options):
"""Constructor
db[in] the database (if known)
table[in] table name
frm_path[in] full path to .frm file
options[in] options for controlling behavior:
verbosity print extra data during operations (optional)
default value = 0
quiet suppress output except CREATE statement
default False
server path to server for server install
default None
new_engine substitute engine
default None
"""
self.general_data = None
self.key_data = None
self.comment_str = None
self.engine_str = None
self.partition_str = None
self.col_metadata = None
self.column_data = None
self.num_cols = 0
self.default_values = None
self.frm_file = None
self.verbosity = options.get('verbosity', 0)
self.quiet = options.get('quiet', False)
self.server = options.get('server', None)
self.new_engine = options.get('new_engine', None)
self.show_stats = options.get("show_stats", False)
self.db_name = db_name
self.table = table
self.frm_path = frm_path
self.options = options
if self.server is None:
self.csi = None
else:
self.csi = CharsetInfo(options)
def _read_header(self):
"""Read the header information from the file
"""
try:
# Skip to header position
if self.verbosity > 1:
print "# Skipping to header at : %0000x" % 2
self.frm_file.seek(2, 0)
data = self.frm_file.read(_HEADER_LEN)
except Exception, error:
if self.verbosity > 1:
print "EXCEPTION:", error
raise UtilError("Cannot read header.")
# Read header
header = struct.unpack(_HEADER_FORMAT, data)
engine_name = _engine_types[bisect.bisect_left(_engine_keys,
header[1])]['text']
self.general_data = {
'frm_version': header[0],
'legacy_db_type': engine_name,
'IO_SIZE': header[4],
'length': header[6],
'tmp_key_length': header[7],
'rec_length': header[8],
'max_rows': header[10],
'min_rows': header[11],
'db_create_pack': header[12] >> 8, # only want 1 byte
'key_info_length': header[13],
'create_options': header[14],
'frm_file_ver': header[16],
'avg_row_length': header[17],
'default_charset': header[18],
'row_type': header[20],
'charset_low': header[21],
'table_charset': (header[21] << 8) + header[18],
'key_length': header[24],
'MYSQL_VERSION_ID': header[25],
'extra_size': header[26],
'default_part_eng': header[29],
'key_block_size': header[30],
}
# Fix storage engine string if partitioning engine specified
if self.general_data['default_part_eng'] > 0 and \
self.new_engine is None:
self.engine_str = _engine_types[bisect.bisect_left(
_engine_keys, header[29])]['text']
return True
def _read_keys(self):
"""Read key fields from the file
"""
offset = self.general_data['IO_SIZE']
try:
# Skip ahead to key section
if self.verbosity > 1:
print "# Skipping to key data at : %0000x" % int(offset)
self.frm_file.seek(offset, 0)
except Exception, error:
if self.verbosity > 1:
print "EXCEPTION:", error
raise UtilError("Cannot locate keys.")
# Decipher key parts
num_keys = struct.unpack("<B", self.frm_file.read(1))[0]
if num_keys & 0x80:
next_byte = struct.unpack("<B", self.frm_file.read(1))[0]
num_keys = (next_byte << 7) | (num_keys & 0x7f)
low = struct.unpack("<B", self.frm_file.read(1))[0]
high = struct.unpack("<B", self.frm_file.read(1))[0]
num_key_parts = low + (high << 8)
self.frm_file.read(2)
else:
num_key_parts = struct.unpack("<B", self.frm_file.read(1))[0],
self.frm_file.read(4)
self.key_data = {
'num_keys': num_keys,
'num_key_parts': num_key_parts,
'key_names': [],
'keys': [],
}
for i in range(0, self.key_data['num_keys']):
key_info = {
'flags': struct.unpack("<H", self.frm_file.read(2))[0],
'key_length': struct.unpack("<H", self.frm_file.read(2))[0],
'num_parts': struct.unpack("<B", self.frm_file.read(1))[0],
'algorithm': struct.unpack("<B", self.frm_file.read(1))[0],
'block_size': struct.unpack("<H", self.frm_file.read(2))[0],
'key_parts': [],
'comment': "",
}
for j in range(0, key_info['num_parts']):
if self.verbosity > 1:
print "# Reading key part %s." % j
key_part_info = {
'field_num': struct.unpack("<H",
self.frm_file.read(2))[0] &
_FIELD_NR_MASK,
'offset': struct.unpack("<H",
self.frm_file.read(2))[0] - 1,
'key_type': struct.unpack("<H",
self.frm_file.read(2))[0],
'key_part_flag': struct.unpack("<B",
self.frm_file.read(1))[0],
'length': struct.unpack("<H",
self.frm_file.read(2))[0],
}
key_info['key_parts'].append(key_part_info)
self.key_data['keys'].append(key_info)
terminator = struct.unpack("c", self.frm_file.read(1))[0]
for i in range(0, self.key_data['num_keys']):
key_name = ""
# Read until the next 0xff
char_read = ""
while char_read != terminator:
char_read = struct.unpack("c", self.frm_file.read(1))[0]
if char_read != terminator:
key_name += str(char_read)
self.key_data['key_names'].append(key_name)
# Now find the key comments!
self.frm_file.read(1)
for i in range(0, self.key_data['num_keys']):
if (self.key_data['keys'][i]['flags'] & _HA_USES_COMMENT) == \
_HA_USES_COMMENT:
k_len = struct.unpack("<H", self.frm_file.read(2))[0]
com_str = struct.unpack("c" * k_len, self.frm_file.read(k_len))
self.key_data['keys'][i]['comment'] = "".join(com_str)
return True
def _read_comment(self):
"""Read the table comments.
"""
# Fields can be found 1 IO_SIZE more than what has been read to date
# plus 46 bytes.
io_size = self.general_data['IO_SIZE']
record_offset = io_size + self.general_data['tmp_key_length'] + \
self.general_data['rec_length']
offset = (((record_offset / io_size) + 1) * io_size) + 46
try:
# Skip to column position
if self.verbosity > 1:
print "# Skipping to table comments at : %0000x" % int(offset)
self.frm_file.seek(offset, 0)
data = self.frm_file.read(1)
except Exception, error:
if self.verbosity > 1:
print "EXCEPTION:", error
raise UtilError("Cannot read table comment.")
comment_len = struct.unpack("<B", data)[0]
com_chars = struct.unpack("c" * comment_len,
self.frm_file.read(comment_len))
self.comment_str = "".join(com_chars)
return True
def _read_default_values(self):
"""Read the default values for all columns
"""
offset = self.general_data['IO_SIZE'] + \
self.general_data['tmp_key_length']
try:
# Skip ahead to key section
if self.verbosity > 1:
print "# Skipping to default data at : %0000x" % \
int(offset + 1)
self.frm_file.seek(offset + 1, 0)
except Exception, error:
if self.verbosity > 1:
print "EXCEPTION:", error
raise UtilError("Cannot find default data.")
num_bytes = self.general_data['rec_length']
# allow overflow
self.default_values = self.frm_file.read(num_bytes + 100)
def _read_engine_data(self):
"""Read the storage engine data.
"""
# We must calculate the location of the partition information by
# locating the storage engine name and if it is 'partition' then read
# the partition string following that.
offset = self.general_data['IO_SIZE'] + \
self.general_data['tmp_key_length'] + \
self.general_data['rec_length']
try:
# Skip ahead to key section
if self.verbosity > 1:
print "# Skipping to keys at : %0000x" % int(offset + 2)
self.frm_file.seek(offset + 2, 0)
except Exception, error:
if self.verbosity > 1:
print "EXCEPTION:", error
raise UtilError("Cannot find engine data.")
engine_len = struct.unpack("<H", self.frm_file.read(2))[0]
engine_str = "".join(struct.unpack("c" * engine_len,
self.frm_file.read(engine_len)))
# Save engine name unless user specified a new engine to use
if self.engine_str is None:
if self.new_engine is None:
self.engine_str = engine_str
else:
self.engine_str = self.new_engine
part_len = struct.unpack("<I", self.frm_file.read(4))[0]
part_str = "".join(struct.unpack("c" * part_len,
self.frm_file.read(part_len)))
self.partition_str = " ".join(part_str.split('\n'))
return True
def _read_column_names(self, fields_per_screen):
"""Read the table column names.
"""
# Column names start in 00002152.
screens_read = 1
cols = []
col_in_screen = 0
for i in range(0, self.num_cols):
if (col_in_screen == fields_per_screen):
screens_read += 1
col_in_screen = 1
# Do the skips
self.frm_file.read(8) # read ahead 8 bytes
val = '\x20'
while val == '\x20': # skip the spaces
val = self.frm_file.read(1)
self.frm_file.read(2) # read past 2 more bytes
else:
col_in_screen += 1
# get length byte
col_len = struct.unpack("<B", self.frm_file.read(1))[0]
col_str = ""
# Don't copy trailing \x00
j = 0
while j < col_len - 1:
char_found = struct.unpack("c", self.frm_file.read(1))[0]
col_str += char_found
j += 1
# skip trailing \x00 and extra bits except for last col read
if (i < self.num_cols - 1):
self.frm_file.read(3)
cols.append(col_str)
return (screens_read, cols)
def _get_decimal_value(self, recpos, col):
"""Get a decimal value from the default column data
recpos[in] Position in default row to find data
col[in] Column dictionary for the column data
Returns float - default value retrieved
"""
# Guard
if not _is_decimal(col):
return None
col_flags = (int(col['flags_extra'] << 8) + col['flags'])
length = col['bytes_in_col']
decimals = (col_flags >> _FIELDFLAG_DEC_SHIFT) & _FIELDFLAG_MAX_DEC
length = length - (1 if decimals else 0) - \
(1 if (col_flags & _FIELDFLAG_DECIMAL) or (length == 0) else 0)
# algorithm from bin2decimal()
#int intg=precision-scale,
# intg0=intg/DIG_PER_DEC1, frac0=scale/DIG_PER_DEC1,
# intg0x=intg-intg0*DIG_PER_DEC1, frac0x=scale-frac0*DIG_PER_DEC1;
#
#return intg0*sizeof(dec1)+dig2bytes[intg0x]+
# frac0*sizeof(dec1)+dig2bytes[frac0x];
intg = length - decimals
intg0 = intg / _DIG_PER_DEC1
frac0 = decimals / _DIG_PER_DEC1
intg0x = intg - (intg0 * _DIG_PER_DEC1)
frac0x = decimals - (frac0 * _DIG_PER_DEC1)
int_len = (intg0 * 4 + _DIG2BYTES[intg0x]) - 1 # len of integer part
frac_len = (frac0 * 4 + _DIG2BYTES[frac0x]) # len of fractional part
int_val = 0
shift_num = int_len - 1
for i in range(0, int_len):
int_val += ord(self.default_values[recpos + i + 1]) << \
(shift_num * 8)
shift_num -= 1
frac_val = 0
shift_num = frac_len - 1
for i in range(0, frac_len):
frac_val += ord(self.default_values[recpos + int_len + i + 1]) << \
(shift_num * 8)
shift_num -= 1
return float("%s.%s" % (int_val, frac_val))
def _get_field_defaults(self):
"""Retrieve the default values for the columns.
"""
max_len = len(self.default_values)
if self.verbosity > 2:
_print_default_values(self.default_values)
for i in range(0, len(self.column_data)):
col = self.column_data[i]
recpos = self.column_data[i]['recpos']
recpos -= 2
if recpos < 0:
recpos = 0
if recpos > max_len: # safety net
continue
# Read default for decimal types
if _is_decimal(col):
col['default'] = self._get_decimal_value(recpos, col)
continue
len_pos, size = _get_pack_length(col)
field_cs_num = (col['charset_low'] << 8) + col['charset']
# Adjust size based on character set maximum length per char
if _is_cs_enabled(col):
if self.csi:
maxlen = self.csi.get_maxlen(field_cs_num)
else:
maxlen = 1
size = size / maxlen
if len_pos is None:
value = self.default_values[recpos:recpos + size]
else:
value = self.default_values[recpos:recpos + len_pos + size]
# Read default for double type
if col['field_type'] == _MYSQL_TYPE_DOUBLE:
col['default'] = struct.unpack('d', value)[0]
continue
# Read default for float type
if col['field_type'] == _MYSQL_TYPE_FLOAT:
col['default'] = struct.unpack('f', value)[0]
continue
# Need to check for column type. Some are binary!
if len_pos is None: # Some form of string
col_str = ""
for col_def in range(0, len(value)):
if value[col_def] != '\x20':
col_str += value[col_def]
col['default'] = '' if len(col_str) == 0 else col_str
elif len_pos == 0: # packed numeric
len_pos = size
if len_pos == 1:
col['default'] = struct.unpack("<B", value[0:1])[0]
elif len_pos == 2:
col['default'] = struct.unpack("<H", value[0:2])[0]
elif len_pos == 3:
col['default'] = struct.unpack("<HB", value[0:3])[0]
elif len_pos == 4:
col['default'] = struct.unpack("<I", value[0:4])[0]
elif len_pos == 8:
col['default'] = struct.unpack("<Q", value[0:8])[0]
def _read_column_metadata(self):
"""Read the column metadata (size, flags, etc.).
Returns dictionary - column definition data
"""
column_data = []
# Skip ahead
try:
for i in range(0, self.num_cols):
if self.verbosity > 1:
print "# Reading column metadata #%s" % i
data = struct.unpack(_COL_DATA, self.frm_file.read(17))
data_type = _col_types[bisect.bisect_left(_col_keys,
data[13])]
col_def = {
'field_length': data[2], # 1, +3
'bytes_in_col': int(data[3]) + (int(data[4]) << 8),
'recpos': (int(data[6]) << 8) +
(int(data[5])) + (int(data[4]) << 16),
'unireg': data[7], # 1, +8
'flags': data[8], # 1, +9
'flags_extra': data[9], # 1, +10
'unireg_type': data[10], # 1, +11
'charset_low': data[11], # 1, +12
'interval_nr': data[12], # 1, +13
'field_type': data[13], # 1, +14
'field_type_name': data_type['text'],
'charset': data[14], # 1, +15
'comment_length': data[15], # 2, +17
'enums': [],
'comment': "",
'default': None,
}
column_data.append(col_def)
except Exception, error:
if self.verbosity > 1:
print "EXCEPTION:", error
raise UtilError("Cannot locate column data")
return column_data
def _read_column_data(self):
"""Read the column information from the file.
This method builds the list of columns including defaults,
data type, and determines enum and set values.
"""
# Fields can be found 1 IO_SIZE more than what has been read to date
# plus 258 bytes.
io_size = self.general_data['IO_SIZE']
record_offset = io_size + self.general_data['tmp_key_length'] + \
self.general_data['rec_length']
offset = (((record_offset / io_size) + 1) * io_size) + 258
try:
# Skip to column position
if self.verbosity > 1:
print "# Skipping to column data at : %0000x" % int(offset)
self.frm_file.seek(offset, 0)
data = struct.unpack("<HHHHHHHHHHHHH", self.frm_file.read(26))
except Exception, error:
if self.verbosity > 1:
print "EXCEPTION:", error
raise UtilError("Cannot read column header.")
self.num_cols = data[0]
self.col_metadata = {
'num_cols': data[0],
'pos': data[1],
'unknown': data[2],
'n_length': data[3],
'interval_count': data[4],
'interval_parts': data[5],
'int_length': data[6],
'com_length': data[8],
'null_fields': data[12],
}
if self.verbosity > 1:
pprint(self.col_metadata)
# Skip ahead
try:
self.frm_file.read(7)
fields_per_screen = struct.unpack("<B", self.frm_file.read(1))[0]
if self.verbosity > 1:
print "# Fields per screen =", fields_per_screen
self.frm_file.read(46)
col_names = self._read_column_names(fields_per_screen)[1]
self.frm_file.read(1) # skip 1 byte
self.column_data = self._read_column_metadata()
except Exception, error:
if self.verbosity > 1:
print "EXCEPTION:", error
raise UtilError("Cannot read column data.")
# TODO: Add ability to read defaults by modifying _get_field_defaults
# method to correctly read the default values. Currently, it
# does not read some non-character values correctly. When fixed,
# remove this comment and uncomment the following line.
# self._get_field_defaults()
# Skip column names
col_len = 0
for colname in col_names:
col_len += len(colname)
# Skip to enum section
self.frm_file.read(len(col_names) + col_len + 2)
intervals = []
interval_num = 0
for i in range(0, len(col_names)):
self.column_data[i]['name'] = col_names[i]
# Here we read enums and match them to interval_nr.
i_num = self.column_data[i]['interval_nr']
if int(i_num) > 0:
if interval_num < i_num:
interval_num += 1
cols = []
char_found = 99
col_str = ''
while not char_found == 0:
char_found = struct.unpack("B",
self.frm_file.read(1))[0]
if char_found == 255:
if len(col_str):
cols.append(col_str)
col_str = ''
else:
col_str += chr(char_found)
intervals.append(cols)
self.column_data[i]['enums'].extend(
intervals[interval_num - 1])
# Now read column comments
for i in range(0, len(col_names)):
if self.verbosity > 1:
print "# Column comment:", \
self.column_data[i]['comment_length']
if self.column_data[i]['comment_length'] > 0:
col_str = ''
for j in range(0, self.column_data[i]['comment_length']):
if self.verbosity > 3:
print "# Reading column data %s." % j
char_found = struct.unpack("B", self.frm_file.read(1))[0]
col_str += chr(char_found)
self.column_data[i]['comment'] = col_str
return True
def _get_charset_collation(self, col):
"""Get the character set and collation for column
col[in] Column data dictionary
Returns list - option strings for charset and collation if needed
"""
parts = []
field_cs_num = (col['charset_low'] << 8) + col['charset']
table_cs_num = self.general_data['table_charset']
# If no character set information, add unknown tag to prompt user
if self.csi is None:
if field_cs_num is not None and table_cs_num is not None and \
field_cs_num != 'binary' and table_cs_num != field_cs_num:
parts.append(" CHARACTER SET <UNKNOWN>")
return parts
field_cs_name = self.csi.get_name(field_cs_num)
table_cs_name = self.csi.get_name(table_cs_num)
if field_cs_name is not None and table_cs_name is not None and \
field_cs_name != 'binary' and table_cs_name != field_cs_name:
parts.append(" CHARACTER SET `%s`" % field_cs_name)
elif (field_cs_name is None or table_cs_name is None) and \
not self.quiet:
print "C",
print "# WARNING: Cannot get character set name for id =", id
parts.append(" CHARACTER SET <UNKNOWN>")
else:
parts.append("")
# Get collation
def_field_col = self.csi.get_default_collation(field_cs_num)
field_col = self.csi.get_collation(field_cs_num)
if def_field_col is not None and field_col is not None and \
def_field_col[1] != field_col:
parts.append(" COLLATE `%s`" % field_col)
elif def_field_col is None and not self.quiet:
print "# WARNING: Cannot get default collation for id =", id
elif field_col is None and not self.quiet:
print "# WARNING: Cannot get collation for id =", id
else:
parts.append("")
return parts
def _get_column_definitions(self):
"""Build the column definitions
This method constructs the column definitions from the column data
read from the file.
Returns list of strings - column definitions
"""
columns = []
stop = len(self.column_data)
for i in range(0, stop):
col = self.column_data[i]
col_flags = (int(col['flags_extra'] << 8) + col['flags'])
length = int(col['bytes_in_col'])
# Here we need to check for charset maxlen and adjust accordingly
field_cs_num = (col['charset_low'] << 8) + col['charset']
if self.csi:
maxlen = self.csi.get_maxlen(field_cs_num)
else:
maxlen = 1
# Only convert the length for character type fields
if _is_cs_enabled(col):
length = length / maxlen
decimals = int((col_flags >> _FIELDFLAG_DEC_SHIFT) &
_FIELDFLAG_MAX_DEC)
col_parts = []
# name, data type, length
# If enum or set values, put those in definition
if col['enums']:
col_str = " `%s` %s(" % (col['name'], col['field_type_name'])
col_str += ",".join(["'%s'" % i for i in col['enums']])
col_str += ")"
col_parts.append(col_str)
elif _is_no_parens(col) and not _is_blob(col):
col_parts.append(" `%s` %s" %
(col['name'],
col['field_type_name'].lower()))
# for blobs
elif _is_blob(col):
col_parts.append(" `%s` %s" % (col['name'],
_get_blob_text(col)))
# for real types:
elif _is_real(col):
length_str = ""
if _is_decimal(col):
length = length - (1 if decimals else 0) - \
(1 if (col_flags & _FIELDFLAG_DECIMAL) or
(length == 0) else 0)
if decimals == _FIELDFLAG_MAX_DEC:
if not col['field_type_name'].upper() in \
["FLOAT", "DOUBLE"]:
length_str = "(%s)" % length
else:
length_str = "(%s,%s)" % (length, decimals)
col_parts.append(" `%s` %s%s" %
(col['name'],
col['field_type_name'].lower(),
length_str))
else:
col_parts.append(" `%s` %s(%s)" % (col['name'],
col['field_type_name'].lower(),
length))
# unsigned
if col_flags & _FIELDFLAG_DECIMAL == 0 and _is_unsigned(col):
col_parts.append(" unsigned")
# zerofill
if col_flags & _FIELDFLAG_ZEROFILL and _is_unsigned(col):
col_parts.append(" zerofill")
# character set and collation options
if _is_cs_enabled(col):
col_parts.extend(self._get_charset_collation(col))
# null
if col_flags & _FIELDFLAG_MAYBE_NULL:
if not col['default']:
col_parts.append(" DEFAULT NULL")
elif not _is_blob(col):
col_parts.append(" NOT NULL")
# default - Check the _FIELDFLAG_NO_DEFAULT flag. If this flag
# is set, there is no default.
default = col['default']
if col['field_type'] in [_MYSQL_TYPE_TIMESTAMP,
_MYSQL_TYPE_TIMESTAMP2]:
col_parts.append(" DEFAULT CURRENT_TIMESTAMP "
"ON UPDATE CURRENT_TIMESTAMP")
elif col_flags & _FIELDFLAG_NO_DEFAULT == 0 and \
default is not None:
col_parts.append(_format_default(col, col_flags,
length, decimals))
# auto increment
if col['unireg_type'] == _NEXT_NUMBER:
col_parts.append(" AUTO_INCREMENT")
if len(col['comment']) > 0:
col_parts.append(" comment '%s'" % col['comment'])
# if not the last column or if there are keys, append comma
if i < stop - 1 or self.key_data['num_keys'] > 0:
col_parts.append(",")
col_parts.append(" ")
columns.append("".join(col_parts))
return columns
def _get_key_size(self, col, key_info, flags):
"""Get the key size option for column
col[in] Column data dictionary
key_info[in] Key information
flags[in] Key flags
Returns string - string of (N) for size or None for no size information
"""
size_info = None
if _no_keysize(col) or self.csi is None:
return size_info
key_len = int(key_info['length'])
pack_len = _get_pack_length(col)
if col['field_type_name'].upper() == "VARCHAR":
field_len = int(col['field_length'])
elif (_is_real(col) or _is_unsigned(col) or _is_decimal(col)) and \
pack_len[0]:
field_len = int(pack_len[0])
else:
field_len = int(pack_len[1])
field_cs_num = (col['charset_low'] << 8) + col['charset']
if self.csi:
maxlen = self.csi.get_maxlen(field_cs_num)
else:
maxlen = 1
# Geometry is an exception
if col['field_type'] == _MYSQL_TYPE_GEOMETRY:
if self.csi:
size_info = "(%d)" % key_len
else:
size_info = "(UNKNOWN)"
elif not field_len == key_len and \
not int(flags) & _HA_FULLTEXT and not int(flags) & _HA_SPATIAL:
if self.csi:
size_info = "(%d)" % (key_len / maxlen)
else:
size_info = "(UNKNOWN)"
return size_info
def _get_key_columns(self):
"""Build the key column definitions
This method constructs the key definitions from the column data
read from the file.
Returns list of strings - key column definitions
"""
keys = []
key_info = zip(self.key_data['key_names'], self.key_data['keys'])
num_keys = len(key_info)
i = 0
for key, info in key_info:
if key == "PRIMARY":
key_prefix = "PRIMARY KEY"
elif not info['flags'] & _HA_NOSAME:
key_prefix = "UNIQUE KEY"
else:
key_prefix = "KEY"
key_str = "%s `%s` (%s)"
key_cols = ""
for k in range(0, len(info['key_parts'])):
key_part = info['key_parts'][k]
col = self.column_data[key_part['field_num'] - 1]
key_cols += "`%s`" % col['name']
size_str = self._get_key_size(col, key_part, info['flags'])
if size_str:
key_cols += size_str
if k < len(info['key_parts']) - 1:
key_cols += ","
algorithm = _KEY_ALG[info['algorithm']]
if not algorithm == 'UNDEFINED':
key_str += " USING %s" % algorithm
if i < num_keys - 1:
key_str += ","
keys.append(key_str % (key_prefix, key, key_cols))
i += 1
return keys
def _get_table_options(self):
"""Read the general table options from the file.
Returns string - options string for CREATE statement
"""
options = []
gen = self.general_data # short name to save indent, space
options.append(") ENGINE=%s" % self.engine_str)
if self.partition_str is not None and len(self.partition_str):
options.append("%s" % self.partition_str)
if gen['avg_row_length'] > 0:
options.append("AVG_ROW_LENGTH = %s" % gen['avg_row_length'])
if gen['key_block_size'] > 0:
options.append("KEY_BLOCK_SIZE = %s" % gen['key_block_size'])
if gen['max_rows'] > 0:
options.append("MAX_ROWS = %s" % gen['max_rows'])
if gen['min_rows'] > 0:
options.append("MIN_ROWS = %s" % gen['min_rows'])
if gen['default_charset'] > 0:
# If no character set information, add unknown tag to prompt user
if self.csi:
c_id = int(gen['default_charset'])
cs_name = self.csi.get_name(c_id)
if cs_name is not None:
options.append("DEFAULT CHARSET=%s" % cs_name)
elif not self.quiet:
print "# WARNING: Cannot find character set by id =", c_id
# collation
def_col = self.csi.get_default_collation(c_id)
col = self.csi.get_collation(c_id)
if def_col is not None and col is not None and def_col != col:
options.append("COLLATE=`%s`" % col)
elif def_col is None and not self.quiet:
print "# WARNING: Cannot find default collation " + \
"for table using id =", c_id
elif col is None and not self.quiet:
print "# WARNING: Cannot find collation for table " + \
"using id =", c_id
row_format = ""
row_type = int(gen['row_type'])
if row_type == _ROW_TYPE_FIXED:
row_format = "FIXED"
elif row_type == _ROW_TYPE_DYNAMIC:
row_format = "DYNAMIC"
elif row_type == _ROW_TYPE_COMPRESSED:
row_format = "COMPRESSED"
elif row_type == _ROW_TYPE_REDUNDANT:
row_format = "REDUNDANT"
elif row_type == _ROW_TYPE_COMPACT:
row_format = "COMPACT"
if len(row_format) > 0:
options.append("ROW_FORMAT = %s" % row_type)
if self.comment_str is not None and len(self.comment_str):
options.append("COMMENT '%s'" % self.comment_str)
if len(options) > 1:
return options[0] + " " + ", ".join(options[1:]) + ";"
return options[0] + ";"
def _build_create_statement(self):
"""Build the create statement for the .frm file.
This method builds the CREATE TABLE information as read from
the file.
Returns string - CREATE TABLE string
"""
if self.general_data is None:
raise UtilError("Header information missing.")
# CREATE statement preamble
parts = []
# Create preamble
preamble = "CREATE TABLE %s`%s` ("
if self.db_name is not None and len(self.db_name) > 1:
db_str = "`%s`." % self.db_name
else:
db_str = ""
parts.append(preamble % (db_str, self.table))
# Get columns
parts.extend(self._get_column_definitions())
# Get indexes
parts.extend(self._get_key_columns())
# Create postamble and table options
parts.append(self._get_table_options())
return "\n".join(parts)
def get_type(self):
"""Return the file type - TABLE or VIEW
"""
# Fail if we cannot read the file
try:
self.frm_file = open(self.frm_path, "rb")
except Exception, error:
raise UtilError("The file %s cannot be read.\n%s" %
(self.frm_path, error))
# Read the file type
file_type = struct.unpack("<H", self.frm_file.read(2))[0]
# Close file and exit
self.frm_file.close()
# Take action based on file type
if file_type == _TABLE_TYPE:
return "TABLE"
elif file_type == _VIEW_TYPE:
return "VIEW"
else:
return "UNKNOWN"
def show_statistics(self):
"""Show general file and table statistics
"""
print "# File Statistics:"
file_stats = os.stat(self.frm_path)
file_info = {
'Size': file_stats[stat.ST_SIZE],
'Last Modified': time.ctime(file_stats[stat.ST_MTIME]),
'Last Accessed': time.ctime(file_stats[stat.ST_ATIME]),
'Creation Time': time.ctime(file_stats[stat.ST_CTIME]),
'Mode': file_stats[stat.ST_MODE],
}
for value, data in file_info.iteritems():
print "#%22s : %s" % (value, data)
print
# Fail if we cannot read the file
try:
self.frm_file = open(self.frm_path, "rb")
except Exception, error:
raise UtilError("The file %s cannot be read.\n%s" %
(self.frm_path, error))
# Read the file type
file_type = struct.unpack("<H", self.frm_file.read(2))[0]
# Take action based on file type
if not file_type == _TABLE_TYPE:
return
# Read general information
self._read_header()
# Close file and exit
self.frm_file.close()
version = str(self.general_data['MYSQL_VERSION_ID'])
ver_str = "%d.%d.%d" % (int(version[0]), int(version[1:3]),
int(version[3:]))
def_part_eng = 'None'
if self.general_data['default_part_eng'] > 0:
def_part_eng = _engine_types[bisect.bisect_left(
_engine_keys,
self.general_data['default_part_eng'])]['text']
print "# Table Statistics:"
table_info = {
'MySQL Version': ver_str,
'frm Version': self.general_data['frm_version'],
'Engine': self.general_data['legacy_db_type'],
'IO_SIZE': self.general_data['IO_SIZE'],
'frm File_Version': self.general_data['frm_file_ver'],
'Def Partition Engine': def_part_eng,
}
for value, data in table_info.iteritems():
print "#%22s : %s" % (value, data)
print
def show_create_table_statement(self):
"""Show the CREATE TABLE statement
This method reads the .frm file specified in the constructor and
builds a fascimile CREATE TABLE statement if the .frm file describes
a table. For views, the method displays the CREATE VIEW statement
contained in the file.
"""
if not self.quiet:
print "# Reading .frm file for %s:" % self.frm_path
# Fail if we cannot read the file
try:
self.frm_file = open(self.frm_path, "rb")
except Exception, error:
raise UtilError("The file %s cannot be read.\n%s" %
(self.frm_path, error))
# Read the file type
file_type = struct.unpack("<H", self.frm_file.read(2))[0]
# Take action based on file type
if file_type == _TABLE_TYPE:
if not self.quiet:
print "# The .frm file is a TABLE."
# Read general information
self._read_header()
if self.verbosity > 1:
print "# General Data from .frm file:"
pprint(self.general_data)
# Read key information
self._read_keys()
if self.verbosity > 1:
print "# Index (key) Data from .frm file:"
pprint(self.key_data)
# Read default field values information
self._read_default_values()
# Read partition information
self._read_engine_data()
if self.verbosity > 1:
print "# Engine string:", self.engine_str
print "# Partition string:", self.partition_str
# Read column information
self._read_column_data()
if self.verbosity > 1:
print "# Column Data from .frm file:"
pprint(self.column_data)
print "# Number of columns:", self.num_cols
pprint(self.column_data[1:])
# Read comment
self._read_comment()
if self.verbosity > 1:
print "# Comment:", self.comment_str
if self.csi is not None and self.verbosity > 2:
print "# Character sets read from server:"
self.csi.print_charsets()
create_table_statement = self._build_create_statement()
if not self.quiet:
print "# CREATE TABLE Statement:\n"
print create_table_statement
print
elif file_type == _VIEW_TYPE:
# Skip heading
self.frm_file.read(8)
view_data = {}
for line in self.frm_file.readlines():
field, value = line.strip('\n').split("=", 1)
view_data[field] = value
if self.verbosity > 1:
pprint(view_data)
if not self.quiet:
print "# CREATE VIEW Statement:\n"
print view_data['query']
print
else:
raise UtilError("Invalid file type. Magic bytes = %02x" %
file_type)
# Close file and exit
self.frm_file.close()
def change_storage_engine(self):
"""Change the storage engine in an .frm file to MEMORY
This method edits a .frm file to change the storage engine to the
the MEMORY engine.
CAUTION: Method will change the contents of the file.
Returns tuple - (original engine type, original engine name,
sever version from the file)
"""
# Here we must change the code in position 0x03 to the engine code
# and the engine string in body of the file (Calculated location)
if self.verbosity > 1 and not self.quiet:
print "# Changing engine for .frm file %s:" % self.frm_path
# Fail if we cannot read the file
try:
self.frm_file = open(self.frm_path, "r+b")
except Exception, error:
raise UtilError("The file %s cannot be read.\n%s" %
(self.frm_path, error))
# Read the file type
file_type = struct.unpack("<H", self.frm_file.read(2))[0]
# Do nothing if this is a view.
if file_type == _VIEW_TYPE:
return None
# Abort if not table.
if not file_type == _TABLE_TYPE:
raise UtilError("Invalid file type. Magic bytes = %02x" %
file_type)
# Replace engine value
self.frm_file.read(1) # skip 1 byte
engine_type = struct.unpack("<B", self.frm_file.read(1))[0]
# Read general information
self._read_header()
if self.verbosity > 1:
print "# General Data from .frm file:"
pprint(self.general_data)
engine_str = ""
server_version = str(self.general_data['MYSQL_VERSION_ID'])
offset = self.general_data['IO_SIZE'] + \
self.general_data['tmp_key_length'] + \
self.general_data['rec_length']
self.frm_file.seek(offset + 2, 0)
engine_len = struct.unpack("<H", self.frm_file.read(2))[0]
engine_str = "".join(struct.unpack("c" * engine_len,
self.frm_file.read(engine_len)))
if self.verbosity > 1:
print "# Engine string:", engine_str
# If this is a CSV storage engine, don't change the engine type
# and instead create an empty .CSV file
if engine_type == _DB_TYPE_CSV_DB:
new_csv = os.path.splitext(self.frm_path)
f_out = open(new_csv[0] + ".CSV", "w")
f_out.close()
elif engine_type == _DB_TYPE_ARCHIVE_DB:
new_csv = os.path.splitext(self.frm_path)
f_out = open(new_csv[0] + ".ARZ", "w")
f_out.close()
elif engine_type == _DB_TYPE_MRG_MYISAM:
new_csv = os.path.splitext(self.frm_path)
f_out = open(new_csv[0] + ".MRG", "w")
f_out.close()
elif engine_type == _DB_TYPE_BLACKHOLE_DB:
pass # Nothing to do for black hole storage engine
else:
# Write memory type
self.frm_file.seek(3)
self.frm_file.write(struct.pack("<B", 6))
# Write memory name
self.frm_file.seek(offset + 2, 0)
self.frm_file.write(struct.pack("<H", 6))
self.frm_file.write("MEMORY")
# Close file and exit
self.frm_file.close()
return engine_type, engine_str, server_version
#
# Copyright (c) 2011, 2013, Oracle and/or its affiliates. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
"""Module with parsers for General and Slow Query Log.
"""
import re
import decimal
import datetime
from mysql.utilities.exception import LogParserError
_DATE_PAT = r"\d{6}\s+\d{1,2}:\d{2}:\d{2}"
_HEADER_VERSION_CRE = re.compile(
r"(.+), Version: (\d+)\.(\d+)\.(\d+)(?:-(\S+))?")
_HEADER_SERVER_CRE = re.compile(r"Tcp port:\s*(\d+)\s+Unix socket:\s+(.*)")
_SLOW_TIMESTAMP_CRE = re.compile(r"#\s+Time:\s+(" + _DATE_PAT + r")")
_SLOW_USERHOST_CRE = re.compile(r"#\s+User@Host:\s+"
r"(?:([\w\d]+))?\s*"
r"\[\s*([\w\d]+)\s*\]\s*"
r"@\s*"
r"([\w\d\.\-]*)\s*"
r"\[\s*([\d.]*)\s*\]\s*"
r"(?:Id\:\s*(\d+)?\s*)?")
_SLOW_STATS_CRE = re.compile(r"#\sQuery_time:\s(\d*\.\d{1,6})\s*"
r"Lock_time:\s(\d*\.\d{1,6})\s*"
r"Rows_sent:\s(\d*)\s*"
r"Rows_examined:\s(\d*)")
_GENERAL_ENTRY_CRE = re.compile(
r'(?:(' + _DATE_PAT + r'))?\s*'
r'(\d+)\s([\w ]+)\t*(?:(.+))?$')
class LogParserBase(object):
"""Base class for parsing MySQL log files
LogParserBase should be inherited to create parsers for MySQL log files.
This class has the following capabilities:
- Take a stream and check whether it is a file type
- Retrieve next line from stream
- Parse header information from a log file (for General or Slow Query Log)
- Implements the iterator protocol
This class should not be used directly, but inhereted and extended to
match the log file which needs to be parsed.
"""
def __init__(self, stream):
"""Constructor
stream[in] A file type
The stream argument must be a valid file type supporting for
example the readline()-method. For example, the return of the buildin
function open() can be used:
LogParserBase(open("/path/to/mysql.log"))
Raises LogParserError on errors.
"""
self._stream = None
self._version = None
self._program = None
self._port = None
self._socket = None
self._start_datetime = None
self._last_seen_datetime = None
# Check if we got a file type
line = None
try:
self._stream = stream
line = self._get_next_line()
except AttributeError:
raise LogParserError("Need a file type")
# Not every log file starts with a header
if line is not None and line.endswith('started with:'):
self._parse_header(line)
else:
self._stream.seek(0)
def _get_next_line(self):
"""Get next line from the log file
This method reads the next line from the stream. Trailing
newline (\n) and carraige return (\r) are removed.
Returns next line as string or None
"""
line = self._stream.readline()
if not line:
return None
return line.rstrip('\r\n')
def _parse_header(self, line):
"""Parse the header of a MySQL log file
line[in] A string, usually result of self._get_next_line()
This method parses the header of a MySQL log file, that is the header
found in the General and Slow Query log files. It sets attributes
_version, _program, _port and _socket.
Note that headers can repeat in a log file, for example, after a
restart of the MySQL server.
Example header:
/usr/sbin/mysqld, Version: 5.5.17-log (Source distribution). started
with:
Tcp port: 0 Unix socket: /tmp/mysql.sock
Time Id Command Argument
Raises LogParserError on errors.
"""
if line is None:
return
# Header line containing executable and version, example:
# /raid0/mysql/mysql/bin/mysqld,
# Version: 5.5.17-log (Source distribution). started with:
info = _HEADER_VERSION_CRE.match(line)
if not info:
raise LogParserError("Could not read executable and version from "
"header")
program, major, minor, patch, extra = info.groups()
# Header line with server information, example:
# Tcp port: 3306 Unix socket: /tmp/mysql.sock
line = self._get_next_line()
info = _HEADER_SERVER_CRE.match(line)
if not info:
raise LogParserError("Malformed server header line: %s" % line)
tcp_port, unix_socket = info.groups()
# Throw away column header line, example:
# Time Id Command Argument
self._get_next_line()
self._version = (int(major), int(minor), int(patch), extra)
self._program = program
self._port = int(tcp_port)
self._socket = unix_socket
@property
def version(self):
"""Returns the MySQL server version
This property returns a tuple descriving the version of the
MySQL server producing the log file. The tuple looks like this:
(major, minor, patch, extra)
The extra part is optional and when not available will be None.
Examples:
(5,5,17,'log')
(5,1,57,None)
Note that the version can change in the same log file.
Returns a tuple or None.
"""
return self._version
@property
def program(self):
"""Returns the executable which wrote the log file
This property returns the full path to the executable which
produced the log file.
Note that the executable can change in the same log file.
Returns a string or None.
"""
return self._program
@property
def port(self):
"""Returns the MySQL server TCP/IP port
This property returns the TCP/IP port on which the MySQL server
was listening.
Note that the TCP/IP port can change in the same log file.
Returns an integer or None.
"""
return self._port
@property
def socket(self):
"""Returns the MySQL server UNIX socket
This property returns full path to UNIX socket used the MySQL server
to accept incoming connections on UNIX-like servers.
Note that the UNIX socket location can change in the same log file.
Returns a string or None.
"""
return self._socket
@property
def start_datetime(self):
"""Returns timestamp of first read log entry
This property returns the timestamp of the first read log entry.
Returns datetime.datetime-object or None.
"""
return self._start_datetime
@property
def last_seen_datetime(self):
"""Returns timestamp of last read log entry
This property returns the timestamp of the last read log entry.
Returns datetime.datetime-object or None
"""
return self._last_seen_datetime
def __iter__(self):
"""Class is iterable
Returns a LogParserBase-object.
"""
return self
def next(self):
"""Returns the next log entry
Raises StopIteration when no more entries are available.
Returns a LogEntryBase-object.
"""
entry = self._parse_entry()
if entry is None:
raise StopIteration
return entry
def _parse_entry(self):
"""Returns a parsed log entry
"""
pass
def __str__(self):
"""String representation of LogParserBase
"""
return "<%(clsname)s, MySQL v%(version)s>" % dict(
clsname=self.__class__.__name__,
version='.'.join([str(v) for v in self._version[0:3]]) +
(self._version[3] or '')
)
class GeneralQueryLog(LogParserBase):
"""Class implementing a parser for the MySQL General Query Log
The GeneralQueryLog-class implements a parse for the MySQL General Query
Log and has the following capabilities:
- Parse General Query Log entries
- Possibility to handle special commands
- Keep track of MySQL sessions and remove them
- Process log headers found later in the log file
"""
def __init__(self, stream):
"""Constructor
stream[in] file type
Raises LogParserError on errors.
"""
super(GeneralQueryLog, self).__init__(stream)
self._sessions = {}
self._cached_logentry = None
self._commands = {
#'Sleep': None,
'Quit': self._handle_quit,
'Init DB': self._handle_init_db,
'Query': self._handle_multi_line,
#'Field List': None,
#'Create DB': None,
#'Drop DB': None,
#'Refresh': None,
#'Shutdown': None,
#'Statistics': None,
#'Processlist': None,
'Connect': self._handle_connect,
#'Kill': None,
#'Debug': None,
#'Ping': None,
#'Time': None,
#'Delayed insert': None,
#'Change user': None,
#'Binlog Dump': None,
#'Table Dump': None,
#'Connect Out': None,
#'Register Slave': None,
'Prepare': self._handle_multi_line,
'Execute': self._handle_multi_line,
#'Long Data': None,
#'Close stmt': None,
#'Reset stmt': None,
#'Set option': None,
'Fetch': self._handle_multi_line,
#'Daemon': None,
#'Error': None,
}
def _new_session(self, session_id):
"""Create a new session using the given session ID
session_id[in] integer presenting a MySQL session
Returns a dictionary.
"""
self._sessions[session_id] = dict(
database=None,
user=None,
host=None,
time_last_action=None,
to_delete=False
)
return self._sessions[session_id]
@staticmethod
def _handle_connect(entry, session, argument):
"""Handle a 'Connect'-command
entry[in] a GeneralQueryLogEntry-instance
session[in] a dictionary with current session information,
element of self._sessions
argument[in] a string, last part of a log entry
This method reads user and database information from the argument of
a 'Connect'-command. It sets the user, host and database for the
current session and also sets the argument for the entry.
"""
# Argument can be as follows:
# root@localhost on test
# root@localhost on
try:
connection, _, database = argument.split(' ')
except ValueError:
connection = argument.replace(' on', '')
database = None
session['user'], session['host'] = connection.split('@')
session['database'] = database
entry['argument'] = argument
@staticmethod
def _handle_init_db(entry, session, argument):
"""Handle an 'Init DB'-command
entry[in] a GeneralQueryLogEntry-instance
session[in] a dictionary with current session information,
element of self._sessions
argument[in] a string, last part of a log entry
The argument parameter is always the database name.
"""
# Example (of full line):
# 3 Init DB mysql
session['database'] = argument
entry['argument'] = argument
def _handle_multi_line(self, entry, session, argument):
"""Handle a command which can span multiple lines
entry[in] a GeneralQueryLogEntry-instance
session[in] a dictionary with current session information,
element of self._sessions
argument[in] a string, last part of a log entry
The argument parameter passed to this function is the last part of a
General Query Log entry and usually is already the full query.
This function's main purpose is to read log entries which span multiple
lines, such as the Query and Prepare-commands.
"""
# Examples:
# 111205 10:01:14 6 Query SELECT Name FROM time_zone_name
# WHERE Time_zone_id = 417
# 111205 10:03:28 6 Query SELECT Name FROM time_zone_name
# WHERE Time_zone_id = 417
argument_parts = [argument, ]
line = self._get_next_line()
while line:
if line.endswith('started with:'):
self._cached_logentry = line
break
info = _GENERAL_ENTRY_CRE.match(line)
if info is not None:
self._cached_logentry = info.groups()
break
argument_parts.append(line)
line = self._get_next_line()
entry['argument'] = '\n'.join(argument_parts)
@staticmethod
def _handle_quit(entry, session, argument):
"""Handle the 'Quit'-command
entry[in] a GeneralQueryLogEntry-instance
session[in] a dictionary with current session information,
element of self._sessions
argument[in] a string, last part of a log entry
This function sets a flag that the session can be removed from the
session list.
"""
# Example (of full line):
# 111205 10:06:53 6 Quit
session['to_delete'] = True
def _parse_command(self, logentry, entry):
"""Parse a log entry from the General Query Log
logentry[in] a string or tuple
entry[in] an instance of GeneralQueryLogEntry
The logentry-parameter is either a line read from the log file or
the result of a previous attempt to read a command.
The entry argument should be an instance of GeneralQueryLogEntry.
It returns the entry or None if nothing could be read.
Raises LogParserError on errors.
Returns the GeneralQueryLogEntry-instance or None
"""
if logentry is None:
return None
if isinstance(logentry, tuple):
dt, session_id, command, argument = logentry
elif logentry.endswith('started with:'):
while logentry.endswith('started with:'):
# We got a header
self._parse_header(logentry)
logentry = self._get_next_line()
if logentry is None:
return None
return self._parse_command(logentry, entry)
else:
info = _GENERAL_ENTRY_CRE.match(logentry)
if info is None:
raise LogParserError("Failed parsing command line: %s"
% logentry)
dt, session_id, command, argument = info.groups()
self._cached_logentry = None
session_id = int(session_id)
entry['session_id'] = session_id
try:
session = self._sessions[session_id]
except KeyError:
session = self._new_session(session_id)
entry['command'] = command
if dt is not None:
entry['datetime'] = datetime.datetime.strptime(dt,
"%y%m%d %H:%M:%S")
session['time_last_action'] = entry['datetime']
else:
entry['datetime'] = session['time_last_action']
try:
self._commands[command](entry, session, argument)
except KeyError:
# Generic command
entry['argument'] = argument
for key in entry.keys():
if key in session:
entry[key] = session[key]
if session['to_delete'] is True:
del self._sessions[session_id]
del session
return entry
def _parse_entry(self):
"""Returns a parsed log entry
The method _parse_entry() uses _parse_command() to parse
a General Query Log entry. It is used by the iterator protocol methods.
Returns a GeneralQueryLogEntry-instance or None.
"""
entry = GeneralQueryLogEntry()
if self._cached_logentry is not None:
self._parse_command(self._cached_logentry, entry)
return entry
else:
line = self._get_next_line()
if line is None:
return None
self._parse_command(line, entry)
return entry
class SlowQueryLog(LogParserBase):
"""Class implementing a parser for the MySQL Slow Query Log
The SlowQueryLog-class implements a parser for the MySQL Slow Query Log and
has the following capabilities:
- Parse Slow Query Log entries
- Process log headers found later in the log file
- Parse connection and temporal information
- Get statistics of the slow query
"""
def __init__(self, stream):
"""Constructor
stream[in] A file type
The stream argument must be a valid file type supporting for
example the readline()-method. For example, the return of the build-in
function open() can be used:
SlowQueryLog(open("/path/to/mysql-slow.log"))
Raises LogParserError on errors.
"""
super(SlowQueryLog, self).__init__(stream)
self._cached_line = None
self._current_database = None
@staticmethod
def _parse_line(regex, line):
"""Parses a log line using given regular expression
regex[in] a SRE_Match-object
line[in] a string
This function takes a log line and matches the regular expresion given
with the regex argument. It returns the result of
re.MatchObject.groups(), which is a tuple.
Raises LogParserError on errors.
Returns a tuple.
"""
info = regex.match(line)
if info is None:
raise LogParserError('Failed parsing Slow Query line: %s' %
line[:30])
return info.groups()
def _parse_connection_info(self, line, entry):
"""Parses connection info
line[in] a string
entry[in] a SlowQueryLog-instance
The line paramater should be a string, a line read from the Slow Query
Log. The entry argument should be an instance of SlowQueryLogEntry.
Raises LogParserError on failure.
"""
# Example:
# # User@Host: root[root] @ localhost [127.0.0.1]
(priv_user,
unpriv_user,
host,
ip,
sid) = self._parse_line(_SLOW_USERHOST_CRE, line)
entry['user'] = priv_user if priv_user else unpriv_user
entry['host'] = host if host else ip
entry['session_id'] = sid
def _parse_timestamp(self, line, entry):
"""Parses a timestamp
line[in] a string
entry[in] a SlowQueryLog-instance
The line paramater should be a string, a line read from the Slow Query
Log. The entry argument should be an instance of SlowQueryLogEntry.
Raises LogParserError on failure.
"""
# Example:
# # Time: 111206 11:55:54
info = self._parse_line(_SLOW_TIMESTAMP_CRE, line)
entry['datetime'] = datetime.datetime.strptime(info[0],
"%y%m%d %H:%M:%S")
if self._start_datetime is None:
self._start_datetime = entry['datetime']
self._last_seen_datetime = entry['datetime']
def _parse_statistics(self, line, entry):
"""Parses statistics information
line[in] a string
entry[in] a SlowQueryLog-instance
The line paramater should be a string, a line read from the Slow Query
Log. The entry argument should be an instance of SlowQueryLogEntry.
Raises LogParserError on errors.
"""
# Example statistic line:
# Query_time: 0.101194 Lock_time: 0.000331 Rows_sent: 24
# Rows_examined: 11624
result = self._parse_line(_SLOW_STATS_CRE, line)
entry['query_time'] = decimal.Decimal(result[0])
entry['lock_time'] = decimal.Decimal(result[1])
entry['rows_sent'] = int(result[2])
entry['rows_examined'] = int(result[3])
def _parse_query(self, line, entry):
"""Parses the query
line[in] a string
entry[in] a SlowQueryLog-instance
The line paramater should be a string, a line read from the Slow Query
Log. The entry argument should be an instance of SlowQueryLogEntry.
Query entries in the Slow Query Log could span several lines. They can
optionally start with a USE-command and have session variables, such as
'timestamp', set before the actual query.
"""
# Example:
# SET timestamp=1323169459;
# SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA
# WHERE SCHEMA_NAME = 'mysql';
# # User@Host: root[root] @ localhost [127.0.0.1]
query = []
while True:
if line is None:
break
if line.startswith('use'):
entry['database'] = self._current_database = line.split(' ')[1]
elif line.startswith('SET timestamp='):
entry['datetime'] = datetime.datetime.fromtimestamp(
int(line[14:].strip(';')))
elif (line.startswith('# Time:') or line.startswith("# User@Host")
or line.endswith('started with:')):
break
query.append(line)
line = self._get_next_line()
if 'database' in entry:
# This is not always correct: connections without current database
# will get the database name of the previous query. However, it's
# more likely current database is set. Fix would be that the server
# includes a USE-statement for every entry.
if (entry['database'] is None
and self._current_database is not None):
entry['database'] = self._current_database
entry['query'] = '\n'.join(query)
self._cached_line = line
def _parse_entry(self):
"""Parse and returns an entry of the Slow Query Log
Each entry of the slow log consists of:
1. An optional time line
2. A connection information line with user, hostname and database
3. A line containing statistics for the query
4. An optional "use <database>" line
5. A line setting the timestamp, insert_id, and last_insert_id
session variables
6. An optional administartor command line "# administator command"
7. An optional SQL statement or the query
Returns a SlowQueryLogEntry-instance or None
"""
if self._cached_line is not None:
line = self._cached_line
self._cached_line = None
else:
line = self._get_next_line()
if line is None:
return None
while line.endswith('started with:'):
# We got a header
self._parse_header(line)
line = self._get_next_line()
if line is None:
return None
entry = SlowQueryLogEntry()
if line.startswith('# Time:'):
self._parse_timestamp(line, entry)
line = self._get_next_line()
if line.startswith('# User@Host:'):
self._parse_connection_info(line, entry)
line = self._get_next_line()
if line.startswith('# Query_time:'):
self._parse_statistics(line, entry)
line = self._get_next_line()
self._parse_query(line, entry)
return entry
class LogEntryBase(dict):
"""Class inherited by GeneralQueryEntryLog and SlowQueryEntryLog
This class has the following capabilities:
- Inherits from dict
- Dictionary elements can be accessed using attributes. For example,
logentry['database'] is accessible like logentry.database
Should not be used directly.
"""
def __init__(self):
super(LogEntryBase, self).__init__()
self['datetime'] = None
self['database'] = None
self['user'] = None
self['host'] = None
self['session_id'] = None
def __getattr__(self, name):
if name in self:
return self[name]
else:
raise AttributeError("%s has no attribute '%s'" %
(self.__class__.__name__, name))
class GeneralQueryLogEntry(LogEntryBase):
"""Class representing an entry of the General Query Log
"""
def __init__(self):
"""Constructor
GeneralQueryLogEntry inherits from LogEntryBase, which inherits from
dict. Instances of GeneralQueryLogEntry can be used just like
dictionaries.
"""
super(GeneralQueryLogEntry, self).__init__()
self['session_id'] = None
self['command'] = None
self['argument'] = None
def __str__(self):
"""String representation of GeneralQueryLogEntry
"""
param = self.copy()
param['clsname'] = self.__class__.__name__
try:
if len(param['argument']) > 30:
param['argument'] = param['argument'][:28] + '..'
except TypeError:
pass # Nevermind when param['argument'] was not a string.
try:
param['datetime'] = param['datetime'].strftime("%Y-%m-%d %H:%M:%S")
except AttributeError:
param['datetime'] = ''
return ("<%(clsname)s %(datetime)s [%(session_id)s]"
" %(command)s: %(argument)s>" % param)
class SlowQueryLogEntry(LogEntryBase):
"""Class representing an entry of the Slow Query Log
SlowQueryLogEntry inherits from LogEntryBase, which inherits from dict.
Instances of SlowQueryLogEntry can be used just like dictionaries.
"""
def __init__(self):
"""Constructor
"""
super(SlowQueryLogEntry, self).__init__()
self['query'] = None
self['query_time'] = None
self['lock_time'] = None
self['rows_examined'] = None
self['rows_sent'] = None
def __str__(self):
"""String representation of SlowQueryLogEntry
"""
param = self.copy()
param['clsname'] = self.__class__.__name__
try:
param['datetime'] = param['datetime'].strftime("%Y-%m-%d %H:%M:%S")
except AttributeError:
param['datetime'] = ''
return ("<%(clsname)s %(datetime)s [%(user)s@%(host)s] "
"%(query_time)s/%(lock_time)s/%(rows_examined)s/%(rows_sent)s>"
) % param
#
# Copyright (c) 2010, 2013, Oracle and/or its affiliates. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
"""
This module contains abstractions of a MySQL table and an index.
"""
import multiprocessing
import sys
from mysql.utilities.exception import UtilError
from mysql.connector.conversion import MySQLConverter
from mysql.utilities.common.format import print_list
from mysql.utilities.common.database import Database
from mysql.utilities.common.lock import Lock
from mysql.utilities.common.server import Server
from mysql.utilities.common.sql_transform import (convert_special_characters,
quote_with_backticks,
remove_backtick_quoting,
is_quoted_with_backticks)
# Constants
_MAXPACKET_SIZE = 1024 * 1024
_MAXBULK_VALUES = 25000
_MAXTHREADS_INSERT = 6
_MAXROWS_PER_THREAD = 100000
_MAXAVERAGE_CALC = 100
_FOREIGN_KEY_QUERY = """
SELECT CONSTRAINT_NAME, COLUMN_NAME, REFERENCED_TABLE_SCHEMA,
REFERENCED_TABLE_NAME, REFERENCED_COLUMN_NAME
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND
REFERENCED_TABLE_SCHEMA IS NOT NULL
"""
class Index(object):
"""
The Index class encapsulates an index for a given table as defined by
the output of SHOW INDEXES FROM. The class has the following
capabilities:
- Check for duplicates
- Create DROP statement for index
- Print index CREATE statement
"""
def __init__(self, db, index_tuple, verbose=False):
"""Constructor
db[in] Name of database
index_tuple[in] A tuple from the get_tbl_indexes() result set
verbose[in] print extra data during operations (optional)
default value = False
"""
# Initialize and save values
self.db = db
self.q_db = quote_with_backticks(db)
self.verbose = verbose
self.columns = []
self.table = index_tuple[0]
self.q_table = quote_with_backticks(index_tuple[0])
self.unique = not index_tuple[1]
self.name = index_tuple[2]
self.q_name = quote_with_backticks(index_tuple[2])
col = (index_tuple[4], index_tuple[7])
self.columns.append(col)
self.type = index_tuple[10]
self.compared = False # mark as compared for speed
self.duplicate_of = None # saves duplicate index
if index_tuple[7] > 0:
self.column_subparts = True # check subparts e.g. a(20)
else:
self.column_subparts = False
@staticmethod
def __cmp_columns(col_a, col_b):
"""Compare two columns on name and subpart lengths if present
col_a[in] First column to compare
col_b[in] Second column to compare
Returns True if col_a has the same name as col_b and if the
subparts are col_a.sub <= col_b.sub.
"""
sz_this = col_a[1]
sz_that = col_b[1]
# if column has the same name
if col_a[0] == col_b[0]:
# if they both have sub_parts, compare them
if sz_this and sz_that:
if sz_this <= sz_that:
return True
else:
return False
# if this index has a sub_part and the other does
# not, it is potentially redundant
elif sz_this and sz_that is None:
return True
# if neither have sub_parts, it is a match
elif sz_this is None and sz_that is None:
return True
else:
return False # no longer a duplicate
def __check_column_list(self, index):
"""Compare the column list of this index with another
index[in] Instance of Index to compare
Returns True if column list is a subset of index.
"""
# Uniqueness counts - can't be duplicate if uniquess differs
# except for primary keys which are always unique
if index.name != "PRIMARY":
if self.unique != index.unique:
return False
num_cols_this = len(self.columns)
num_cols_that = len(index.columns)
num_cols_same = 0
if self.type == "BTREE":
i = 0
while (i < num_cols_this) and (i < num_cols_that):
if num_cols_same <= i: # Ensures first N cols are the same
if self.__cmp_columns(self.columns[i], index.columns[i]):
num_cols_same = num_cols_same + 1
else:
break
i = i + 1
else: # HASH, RTREE, FULLTEXT
if (self.type == "FULLTEXT") and (num_cols_this != num_cols_that):
return False
i = 0
while (i < num_cols_this) and (i < num_cols_that):
if self.__cmp_columns(self.columns[i], index.columns[i]):
num_cols_same = num_cols_same + 1
else: # Ensures column lists must match
num_cols_same = 0
break
i = i + 1
if (num_cols_same > 0) and (num_cols_this <= num_cols_that):
return True
return False
def is_duplicate(self, index):
"""Compare this index with another
index[in] Instance of Index to compare
Returns True if this index is a subset of the Index presented.
"""
# Don't compare the same index - no two indexes can have the same name
if self.name == index.name:
return False
else:
return self.__check_column_list(index)
def contains_columns(self, col_names):
"""Check if the current index contains the columns of the given index.
Returns True if it contains all the columns of the given index,
otherwise False.
"""
if len(self.columns) < len(col_names):
# If has less columns than given index it does not contain all.
return False
else:
this_col_names = [col[0] for col in self.columns]
# Check if all index column are included in current one..
for col_name in col_names:
if col_name not in this_col_names:
return False # found one column not included.
# Pass previous verification; contains all the columns of given index.
return True
def add_column(self, column, sub_part):
"""Add a column to the list of columns for this index
column[in] Column to add
sub_part[in] Sub part of colunm e.g. a(20)
"""
col = (column, sub_part)
if sub_part > 0:
self.column_subparts = True
self.columns.append(col)
def get_drop_statement(self):
"""Get the drop statement for this index
Note: Ignores PRIMARY key indexes.
Returns the DROP statement for this index.
"""
if self.name == "PRIMARY":
return None
query_str = "ALTER TABLE {db}.{table} DROP INDEX {name}".format(
db=self.q_db, table=self.q_table, name=self.q_name
)
return query_str
def get_remove_columns_statement(self, col_names):
"""Get the ALTER TABLE statement to remove columns for this index.
col_names[in] list of columns names to remove from the index.
Returns the ALTER TABLE statement (DROP/ADD) to remove the given
columns names from the index.
"""
# Create the new columns list for the index.
idx_cols = [col[0] for col in self.columns if col[0] not in col_names]
if not idx_cols:
# Return a DROP statement if no columns are left.
query_str = "ALTER TABLE {db}.{table} DROP INDEX {name}".format(
db=self.q_db, table=self.q_table, name=self.q_name
)
else:
# Otherwise, return a DROP/ADD statement with remaining columns.
idx_cols_str = ', '.join(idx_cols)
query_str = ("ALTER TABLE {db}.{table} DROP INDEX {name}, "
"ADD INDEX {name} ({cols})".format(db=self.q_db,
table=self.q_table,
name=self.q_name,
cols=idx_cols_str))
return query_str
def __get_column_list(self, backtick_quoting=True):
"""Get the column list for an index
This method is used to print the CREATE and DROP statements.
backtick_quoting[in] Indicates if the columns names are to be quoted
with backticks or not. By default: True.
Returns a string representing the list of columns for a
column list. e.g. 'a, b(10), c'
"""
col_list = []
for col in self.columns:
name, sub_part = (col[0], col[1])
if backtick_quoting:
name = quote_with_backticks(name)
if sub_part > 0:
col_str = "{0}({1})".format(name, sub_part)
else:
col_str = name
col_list.append(col_str)
return ', '.join(col_list)
def print_index_sql(self):
"""Print the CREATE INDEX for indexes and ALTER TABLE for a primary key
"""
if self.name == "PRIMARY":
print("ALTER TABLE {db}.{table} ADD PRIMARY KEY ({cols})"
"".format(db=self.q_db, table=self.q_table,
cols=self.__get_column_list()))
else:
create_str = ("CREATE {unique}{fulltext}INDEX {name} ON "
"{db}.{table} ({cols}) {using}")
unique_str = 'UNIQUE ' if self.unique else ''
fulltext_str = 'FULLTEXT ' if self.type == 'FULLTEXT' else ''
if (self.type == "BTREE") or (self.type == "RTREE"):
using_str = 'USING {0}'.format(self.type)
else:
using_str = ''
print(create_str.format(unique=unique_str, fulltext=fulltext_str,
name=self.q_name, db=self.q_db,
table=self.q_table,
cols=self.__get_column_list(),
using=using_str))
def get_row(self):
"""Return index information as a list of columns for tabular output.
"""
cols = self.__get_column_list(backtick_quoting=False)
return (self.db, self.table, self.name, self.type, cols)
class Table(object):
"""
The Table class encapsulates a table for a given database. The class
has the following capabilities:
- Check to see if the table exists
- Check indexes for duplicates and redundancies
- Print list of indexes for the table
- Extract table data
- Import table data
- Copy table data
"""
def __init__(self, server1, name, options=None):
"""Constructor
server[in] A Server object
name[in] Name of table in the form (db.table)
options[in] options for class: verbose, quiet, get_cols,
quiet If True, do not print information messages
verbose print extra data during operations (optional)
(default is False)
get_cols If True, get the column metadata on construction
(default is False)
"""
if options is None:
options = {}
self.verbose = options.get('verbose', False)
self.quiet = options.get('quiet', False)
self.server = server1
# Keep table identifier considering backtick quotes
if is_quoted_with_backticks(name):
self.q_table = name
self.q_db_name, self.q_tbl_name = Database.parse_object_name(name)
self.db_name = remove_backtick_quoting(self.q_db_name)
self.tbl_name = remove_backtick_quoting(self.q_tbl_name)
self.table = ".".join([self.db_name, self.tbl_name])
else:
self.table = name
self.db_name, self.tbl_name = Database.parse_object_name(name)
self.q_db_name = quote_with_backticks(self.db_name)
self.q_tbl_name = quote_with_backticks(self.tbl_name)
self.q_table = ".".join([self.q_db_name, self.q_tbl_name])
self.obj_type = "TABLE"
self.pri_idx = None
# We store each type of index in a separate list to make it easier
# to manipulate
self.btree_indexes = []
self.hash_indexes = []
self.rtree_indexes = []
self.fulltext_indexes = []
self.text_columns = []
self.blob_columns = []
self.column_format = None
self.column_names = []
self.q_column_names = []
if options.get('get_cols', False):
self.get_column_metadata()
self.dest_vals = None
self.storage_engine = None
# Get max allowed packet
res = self.server.exec_query("SELECT @@session.max_allowed_packet")
if res:
self.max_packet_size = res[0][0]
else:
self.max_packet_size = _MAXPACKET_SIZE
# Watch for invalid values
if self.max_packet_size > _MAXPACKET_SIZE:
self.max_packet_size = _MAXPACKET_SIZE
self._insert = "INSERT INTO %s.%s VALUES "
self.query_options = { # Used for skipping fetch of rows
'fetch': False
}
def exists(self, tbl_name=None):
"""Check to see if the table exists
tbl_name[in] table name (db.table)
(optional) If omitted, operation is performed
on the class instance table name.
return True = table exists, False = table does not exist
"""
db, table = (None, None)
if tbl_name:
db, table = Database.parse_object_name(tbl_name)
else:
db = self.db_name
table = self.tbl_name
res = self.server.exec_query("SELECT TABLE_NAME " +
"FROM INFORMATION_SCHEMA.TABLES " +
"WHERE TABLE_SCHEMA = '%s'" % db +
" and TABLE_NAME = '%s'" % table)
return (res is not None and len(res) >= 1)
def get_column_metadata(self, columns=None):
"""Get information about the table for the bulk insert operation.
This method builds lists that describe the metadata of the table. This
includes lists for:
column names
column format for building VALUES clause
blob fields - for use in generating INSERT/UPDATE for blobs
text fields - for use in checking for single quotes
columns[in] if None, use EXPLAIN else use column list.
"""
if columns is None:
columns = self.server.exec_query("explain %s" % self.q_table)
stop = len(columns)
self.column_names = []
self.q_column_names = []
col_format_values = [''] * stop
if columns is not None:
for col in range(0, stop):
if is_quoted_with_backticks(columns[col][0]):
self.column_names.append(
remove_backtick_quoting(columns[col][0]))
self.q_column_names.append(columns[col][0])
else:
self.column_names.append(columns[col][0])
self.q_column_names.append(
quote_with_backticks(columns[col][0]))
col_type_prefix = columns[col][1][0:4].lower()
if col_type_prefix in ('varc', 'char', 'enum', 'set('):
self.text_columns.append(col)
col_format_values[col] = "'%s'"
elif col_type_prefix in ("blob", "text"):
self.blob_columns.append(col)
col_format_values[col] = "%s"
elif col_type_prefix in ("date", "time"):
col_format_values[col] = "'%s'"
else:
col_format_values[col] = "%s"
self.column_format = "%s%s%s" % \
(" (", ', '.join(col_format_values), ")")
def get_col_names(self, quote_backticks=False):
"""Get column names for the export operation.
quote_backticks[in] If True the column names will be quoted with
backticks. Default is False.
Return (list) column names
"""
if self.column_format is None:
self.column_names = []
self.q_column_names = []
rows = self.server.exec_query("explain %s" % self.q_table)
for row in rows:
self.column_names.append(row[0])
self.q_column_names.append(quote_with_backticks(row[0]))
return self.q_column_names if quote_backticks else self.column_names
def _build_update_blob(self, row, new_db, name, blob_col):
"""Build an UPDATE statement to update blob fields.
row[in] a row to process
new_db[in] new database name
name[in] name of the table
conn_val[in] connection information for the destination server
query[in] the INSERT string for executemany()
blob_col[in] number of the column containing the blob
Returns UPDATE string
"""
if self.column_format is None:
self.get_column_metadata()
blob_insert = "UPDATE %s.%s SET " % (new_db, name)
where_values = []
do_commas = False
has_data = False
stop = len(row)
for col in range(0, stop):
col_name = self.q_column_names[col]
if col in self.blob_columns:
if row[col] is not None and len(row[col]) > 0:
if do_commas:
blob_insert += ", "
blob_insert += "%s = " % col_name + "%s" % \
MySQLConverter().quote(row[col])
has_data = True
do_commas = True
else:
# Convert None values to NULL (not '' to NULL)
if row[col] is None:
value = 'NULL'
else:
value = "'{0}'".format(row[col])
where_values.append("{0} = {1}".format(col_name, value))
if has_data:
return blob_insert + " WHERE " + " AND ".join(where_values) + ";"
return None
def get_column_string(self, row, new_db):
"""Return a formatted list of column data.
row[in] a row to process
new_db[in] new database name
Returns (string) column list
"""
if self.column_format is None:
self.get_column_metadata()
blob_inserts = []
values = list(row)
# Find blobs
for col in self.blob_columns:
# Save blob updates for later...
blob = self._build_update_blob(row, new_db, self.q_tbl_name, col)
if blob is not None:
blob_inserts.append(blob)
values[col] = "NULL"
# Replace single quotes located in the value for a text field with the
# correct special character escape sequence. This fixes SQL errors
# related to using single quotes in a string value that is single
# quoted. For example, 'this' is it' is changed to 'this\' is it'
for col in self.text_columns:
#Check if the value is not None before replacing quotes
if values[col]:
# Apply escape sequences to special characters
values[col] = convert_special_characters(values[col])
# Build string (add quotes to "string" like types)
val_str = self.column_format % tuple(values)
# Change 'None' occurrences with "NULL"
val_str = val_str.replace(", None", ", NULL")
val_str = val_str.replace("(None", "(NULL")
val_str = val_str.replace(", 'None'", ", NULL")
val_str = val_str.replace("('None'", "(NULL")
return (val_str, blob_inserts)
def make_bulk_insert(self, rows, new_db, columns_names=None):
"""Create bulk insert statements for the data
Reads data from a table (rows) and builds group INSERT statements for
bulk inserts.
Note: This method does not print any information to stdout.
rows[in] a list of rows to process
new_db[in] new database name
Returns (tuple) - (bulk insert statements, blob data inserts)
"""
if self.column_format is None:
self.get_column_metadata()
data_inserts = []
blob_inserts = []
row_count = 0
data_size = 0
val_str = None
for row in rows:
if row_count == 0:
if columns_names:
insert_str = "INSERT INTO {0}.{1} ({2}) VALUES ".format(
new_db, self.q_tbl_name, ", ".join(columns_names)
)
else:
insert_str = self._insert % (new_db, self.q_tbl_name)
if val_str:
row_count += 1
insert_str += val_str
data_size = len(insert_str)
col_data = self.get_column_string(row, new_db)
val_str = col_data[0]
if len(col_data[1]) > 0:
blob_inserts.extend(col_data[1])
row_size = len(val_str)
next_size = data_size + row_size + 3
if (row_count >= _MAXBULK_VALUES) or \
(next_size > (int(self.max_packet_size) - 512)): # add buffer
data_inserts.append(insert_str)
row_count = 0
else:
row_count += 1
if row_count > 1:
insert_str += ", "
insert_str += val_str
data_size += row_size + 3
if row_count > 0:
data_inserts.append(insert_str)
return (data_inserts, blob_inserts)
def get_storage_engine(self):
"""Get the storage engine (in UPPERCASE) for the table.
Returns the name in UPPERCASE of the storage engine use for the table
or None if the information is not found.
"""
self.server.exec_query("USE {0}".format(self.q_db_name),
self.query_options)
res = self.server.exec_query(
"SHOW TABLE STATUS WHERE name = '{0}'".format(self.tbl_name)
)
try:
# Return store engine converted to UPPER cases.
return res[0][1].upper() if res[0][1] else None
except IndexError:
# Return None if table status information is not available.
return None
def get_segment_size(self, num_conn=1):
"""Get the segment size based on number of connections (threads).
num_conn[in] Number of threads(connections) to use
Default = 1 (one large segment)
Returns (int) segment_size
Note: if num_conn <= 1 - returns number of rows
"""
# Get number of rows
num_rows = 0
try:
res = self.server.exec_query("USE %s" % self.q_db_name,
self.query_options)
except:
pass
res = self.server.exec_query("SHOW TABLE STATUS LIKE '%s'" %
self.tbl_name)
if res:
num_rows = int(res[0][4])
if num_conn <= 1:
return num_rows
# Calculate number of threads and segment size to fetch
thread_limit = num_conn
if thread_limit > _MAXTHREADS_INSERT:
thread_limit = _MAXTHREADS_INSERT
if num_rows > (_MAXROWS_PER_THREAD * thread_limit):
max_threads = thread_limit
else:
max_threads = int(num_rows / _MAXROWS_PER_THREAD)
if max_threads == 0:
max_threads = 1
if max_threads > 1 and self.verbose:
print "# Using multi-threaded insert option. Number of " \
"threads = %d." % max_threads
return (num_rows / max_threads) + max_threads
def _bulk_insert(self, rows, new_db, destination=None):
"""Import data using bulk insert
Reads data from a table and builds group INSERT statements for writing
to the destination server specified (new_db.name).
This method is designed to be used in a thread for parallel inserts.
As such, it requires its own connection to the destination server.
Note: This method does not print any information to stdout.
rows[in] a list of rows to process
new_db[in] new database name
destination[in] the destination server
"""
if self.dest_vals is None:
self.dest_vals = self.get_dest_values(destination)
# Spawn a new connection
server_options = {
'conn_info': self.dest_vals,
'role': "thread",
}
dest = Server(server_options)
dest.connect()
# Issue the write lock
lock_list = [("%s.%s" % (new_db, self.q_tbl_name), 'WRITE')]
my_lock = Lock(dest, lock_list, {'locking': 'lock-all', })
# First, turn off foreign keys if turned on
dest.disable_foreign_key_checks(True)
if self.column_format is None:
self.get_column_metadata()
data_lists = self.make_bulk_insert(rows, new_db)
insert_data = data_lists[0]
blob_data = data_lists[1]
# Insert the data first
for data_insert in insert_data:
try:
dest.exec_query(data_insert, self.query_options)
except UtilError, e:
raise UtilError("Problem inserting data. "
"Error = %s" % e.errmsg)
# Now insert the blob data if there is any
for blob_insert in blob_data:
try:
dest.exec_query(blob_insert, self.query_options)
except UtilError, e:
raise UtilError("Problem updating blob field. "
"Error = %s" % e.errmsg)
# Now, turn on foreign keys if they were on at the start
dest.disable_foreign_key_checks(False)
my_lock.unlock()
del dest
def insert_rows(self, rows, new_db, destination=None, spawn=False):
"""Insert rows in the table using bulk copy.
This method opens a new connect to the destination server to insert
the data with a bulk copy. If spawn is True, the method spawns a new
process and returns it. This allows for using a multi-threaded insert
which can be faster on some platforms. If spawn is False, the method
will open a new connection to insert the data.
num_conn[in] Number of threads(connections) to use for insert
rows[in] List of rows to insert
new_db[in] Rename the db to this name
destination[in] Destination server
Default = None (copy to same server)
spawn[in] If True, spawn a new process for the insert
Default = False
Returns If spawn == True, process
If spawn == False, None
"""
if self.column_format is None:
self.get_column_metadata()
if self.dest_vals is None:
self.dest_vals = self.get_dest_values(destination)
proc = None
if spawn:
proc = multiprocessing.Process(target=self._bulk_insert,
args=(rows, new_db, destination))
else:
self._bulk_insert(rows, new_db, destination)
return proc
def _clone_data(self, new_db):
"""Clone table data.
This method will copy all of the data for a table
from the old database to the new database on the same server.
new_db[in] New database name for the table
"""
query_str = "INSERT INTO %s.%s SELECT * FROM %s.%s" % \
(new_db, self.q_tbl_name, self.q_db_name, self.q_tbl_name)
if self.verbose and not self.quiet:
print query_str
self.server.exec_query(query_str)
def copy_data(self, destination, cloning=False, new_db=None,
connections=1):
"""Retrieve data from a table and copy to another server and database.
Reads data from a table and inserts the correct INSERT statements into
the file provided.
Note: if connections < 1 - retrieve the data one row at-a-time
destination[in] Destination server
cloning[in] If True, we are copying on the same server
new_db[in] Rename the db to this name
connections[in] Number of threads(connections) to use for insert
"""
if new_db is None:
new_db = self.q_db_name
else:
# If need quote new_db identifier with backticks
if not is_quoted_with_backticks(new_db):
new_db = quote_with_backticks(new_db)
num_conn = int(connections)
if cloning:
self._clone_data(new_db)
else:
# Read and copy the data
pthreads = []
for rows in self.retrieve_rows(num_conn):
p = self.insert_rows(rows, new_db, destination, num_conn > 1)
if p is not None:
p.start()
pthreads.append(p)
if num_conn > 1:
# Wait for all threads to finish
for p in pthreads:
p.join()
def retrieve_rows(self, num_conn=1):
"""Retrieve the table data in rows.
This method can be used to retrieve rows from a table as a generator
specifying how many rows to retrieve at one time (segment_size is
calculated based on number of rows / number of connections).
Note: if num_conn < 1 - retrieve the data one row at-a-time
num_conn[in] Number of threads(connections) to use
Default = 1 (one large segment)
Returns (yield) row data
"""
if num_conn > 1:
# Only get the segment size when needed.
segment_size = self.get_segment_size(num_conn)
# Execute query to get all of the data
cur = self.server.exec_query("SELECT * FROM {0}".format(self.q_table),
self.query_options)
while True:
rows = None
if num_conn < 1:
rows = []
row = cur.fetchone()
if row is None:
raise StopIteration()
rows.append(row)
#print "ROWS 1:", rows
elif num_conn == 1:
rows = cur.fetchall()
#print "ROWS 2:", rows
yield rows
raise StopIteration()
else:
rows = cur.fetchmany(segment_size)
if not rows:
raise StopIteration()
#print "ROWS 3:", rows
if rows is None:
raise StopIteration()
yield rows
cur.close()
def get_dest_values(self, destination=None):
"""Get the destination connection values if not already set.
destination[in] Connection values for destination server
Returns connection values for destination if set or self.server
"""
# Get connection to database
if destination is None:
conn_val = {
"host": self.server.host,
"user": self.server.user,
"passwd": self.server.passwd,
"unix_socket": self.server.socket,
"port": self.server.port
}
else:
conn_val = {
"host": destination.host,
"user": destination.user,
"passwd": destination.passwd,
"unix_socket": destination.socket,
"port": destination.port
}
return conn_val
def get_tbl_indexes(self):
"""Return a result set containing all indexes for a given table
Returns result set
"""
res = self.server.exec_query("SHOW INDEXES FROM %s" % self.q_table)
return res
def get_tbl_foreign_keys(self):
"""Return a result set containing all foreign keys for the table
Returns result set
"""
res = self.server.exec_query(_FOREIGN_KEY_QUERY % (self.db_name,
self.tbl_name))
return res
@staticmethod
def __append(indexes, index):
"""Encapsulated append() method to ensure the primary key index
is placed at the front of the list.
"""
# Put the primary key first so that it can be compared to all indexes
if index.name == "PRIMARY":
indexes.insert(0, index)
else:
indexes.append(index)
@staticmethod
def __check_index(index, indexes, master_list):
"""Check a single index for duplicate or redundancy against a list
of other Indexes.
index[in] The Index to compare
indexes[in] A list of Index instances to compare
master_list[in] A list of know duplicate Index instances
Returns a tuple of whether duplicates are found and if found the
list of duplicate indexes for this table
"""
duplicates_found = False
duplicate_list = []
if indexes and index:
for idx in indexes:
# Don't compare b == a when a == b has already occurred
if not index.compared and idx.is_duplicate(index):
# make sure we haven't already found this match
if not idx.column_subparts:
idx.compared = True
if not (idx in master_list):
duplicates_found = True
idx.duplicate_of = index
duplicate_list.append(idx)
return (duplicates_found, duplicate_list)
def __check_index_list(self, indexes):
"""Check a list of Index instances for duplicates.
indexes[in] A list of Index instances to compare
Returns a tuple of whether duplicates are found and if found the
list of duplicate indexes for this table
"""
duplicates_found = False
duplicate_list = []
# Caller must ensure there are at least 2 elements in the list.
if len(indexes) < 2:
return (False, None)
for index in indexes:
res = self.__check_index(index, indexes, duplicate_list)
if res[0]:
duplicates_found = True
duplicate_list.extend(res[1])
return (duplicates_found, duplicate_list)
def __check_clustered_index_list(self, indexes):
""" Check for indexes containing the clustered index from the list.
indexes[in] list of indexes instances to check.
Returns the list of indexes that contain the clustered index or
None (if none found).
"""
redundant_indexes = []
if not self.pri_idx:
self.get_primary_index()
pri_idx_cols = [col[0] for col in self.pri_idx]
for index in indexes:
if index.name == 'PRIMARY':
# Skip primary key.
continue
elif index.contains_columns(pri_idx_cols):
redundant_indexes.append(index)
return redundant_indexes if redundant_indexes else []
def _get_index_list(self):
"""Get the list of indexes for a table.
Returns list containing indexes.
"""
rows = self.get_tbl_indexes()
return rows
def get_primary_index(self):
"""Retrieve the primary index columns for this table.
"""
pri_idx = []
rows = self.server.exec_query("EXPLAIN " + self.q_table)
# Return False if no indexes found.
if not rows:
return pri_idx
for row in rows:
if row[3] == 'PRI':
pri_idx.append(row)
self.pri_idx = pri_idx
return pri_idx
def get_indexes(self):
"""Retrieve the indexes from the server and load them into lists
based on type.
Returns True - table has indexes, False - table has no indexes
"""
self.btree_indexes = []
self.hash_indexes = []
self.rtree_indexes = []
self.fulltext_indexes = []
if self.verbose:
print "# Getting indexes for %s" % (self.table)
rows = self._get_index_list()
# Return False if no indexes found.
if not rows:
return False
idx = None
prev_name = ""
for row in rows:
if (row[2] != prev_name) or (prev_name == ""):
prev_name = row[2]
idx = Index(self.db_name, row)
if idx.type == "BTREE":
self.__append(self.btree_indexes, idx)
elif idx.type == "HASH":
self.__append(self.hash_indexes, idx)
elif idx.type == "RTREE":
self.__append(self.rtree_indexes, idx)
else:
self.__append(self.fulltext_indexes, idx)
elif idx:
idx.add_column(row[4], row[7])
return True
def check_indexes(self, show_drops=False):
"""Check for duplicate or redundant indexes and display all matches
show_drops[in] (optional) If True the DROP statements are printed
Note: You must call get_indexes() prior to calling this method. If
get_indexes() is not called, no duplicates will be found.
"""
dupes = []
res = self.__check_index_list(self.btree_indexes)
# if there are duplicates, add them to the dupes list
if res[0]:
dupes.extend(res[1])
res = self.__check_index_list(self.hash_indexes)
# if there are duplicates, add them to the dupes list
if res[0]:
dupes.extend(res[1])
res = self.__check_index_list(self.rtree_indexes)
# if there are duplicates, add them to the dupes list
if res[0]:
dupes.extend(res[1])
# We sort the fulltext index columns - easier to do it once here
for index in self.fulltext_indexes:
cols = index.columns
cols.sort(key=lambda cols: cols[0])
res = self.__check_index_list(self.fulltext_indexes)
# if there are duplicates, add them to the dupes list
if res[0]:
dupes.extend(res[1])
# Check if secondary keys contains the clustered index (i.e. Primary
# key). In InnoDB, each record in a secondary index contains the
# primary key columns. Therefore the use of keys that include the
# primary key might be redundant.
redundant_idxs = []
if not self.storage_engine:
self.storage_engine = self.get_storage_engine()
if self.storage_engine == 'INNODB':
all_indexes = self.btree_indexes
all_indexes.extend(self.hash_indexes)
all_indexes.extend(self.rtree_indexes)
all_indexes.extend(self.fulltext_indexes)
redundant_idxs = self.__check_clustered_index_list(all_indexes)
# Print duplicate and redundant keys on composite indexes.
if len(dupes) > 0:
plural_1, verb_conj, plural_2 = (
('', 'is a', '') if len(dupes) == 1 else ('es', 'are', 's')
)
print("# The following index{0} {1} duplicate{2} or redundant "
"for table {3}:".format(plural_1, verb_conj, plural_2,
self.table))
for index in dupes:
print("#")
index.print_index_sql()
print("# may be redundant or duplicate of:")
index.duplicate_of.print_index_sql()
if show_drops:
print("#\n# DROP statement{0}:\n#".format(plural_2))
for index in dupes:
print("{0};".format(index.get_drop_statement()))
print("#")
# Print redundant indexes containing clustered key.
if redundant_idxs:
plural, verb_conj, plural_2 = (
('', 's', '') if len(redundant_idxs) == 1 else ('es', '', 's')
)
print("# The following index{0} for table {1} contain{2} the "
"clustered index and might be redundant:".format(plural,
self.table,
verb_conj))
for index in redundant_idxs:
print("#")
index.print_index_sql()
if show_drops:
print("#\n# DROP/ADD statement{0}:\n#".format(plural_2))
# Get columns from primary key to be removed.
pri_idx_cols = [col[0] for col in self.pri_idx]
for index in redundant_idxs:
print("{0};".format(
index.get_remove_columns_statement(pri_idx_cols)
))
print("#")
if not self.quiet and not dupes and not redundant_idxs:
print("# Table {0} has no duplicate nor redundant "
"indexes.".format(self.table))
def show_special_indexes(self, fmt, limit, best=False):
"""Display a list of the best or worst queries for this table.
This shows the best (first n) or worst (last n) performing queries
for a given table.
fmt[in] format out output = sql, table, tab, csv
limit[in] number to limit the display
best[in] (optional) if True, print best performing indexes
if False, print worst performing indexes
"""
_QUERY = """
SELECT
t.TABLE_SCHEMA AS `db`, t.TABLE_NAME AS `table`,
s.INDEX_NAME AS `index name`, s.COLUMN_NAME AS `field name`,
s.SEQ_IN_INDEX `seq in index`, s2.max_columns AS `# cols`,
s.CARDINALITY AS `card`, t.TABLE_ROWS AS `est rows`,
ROUND(((s.CARDINALITY / IFNULL(
IF(t.TABLE_ROWS < s.CARDINALITY, s.CARDINALITY, t.TABLE_ROWS),
0.01)) * 100), 2) AS `sel_percent`
FROM INFORMATION_SCHEMA.STATISTICS s
INNER JOIN INFORMATION_SCHEMA.TABLES t
ON s.TABLE_SCHEMA = t.TABLE_SCHEMA
AND s.TABLE_NAME = t.TABLE_NAME
INNER JOIN (
SELECT TABLE_SCHEMA, TABLE_NAME, INDEX_NAME,
MAX(SEQ_IN_INDEX) AS max_columns
FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
AND INDEX_NAME != 'PRIMARY'
GROUP BY TABLE_SCHEMA, TABLE_NAME, INDEX_NAME
) AS s2
ON s.TABLE_SCHEMA = s2.TABLE_SCHEMA
AND s.TABLE_NAME = s2.TABLE_NAME
AND s.INDEX_NAME = s2.INDEX_NAME
WHERE t.TABLE_SCHEMA != 'mysql'
AND t.TABLE_ROWS > 10 /* Only tables with some rows */
AND s.CARDINALITY IS NOT NULL
AND (s.CARDINALITY / IFNULL(
IF(t.TABLE_ROWS < s.CARDINALITY, s.CARDINALITY, t.TABLE_ROWS),
0.01)) <= 1.00
ORDER BY `sel_percent`
"""
query_options = {
'params': (self.db_name, self.tbl_name,)
}
rows = []
idx_type = "best"
if not best:
idx_type = "worst"
if best:
rows = self.server.exec_query(_QUERY + "DESC LIMIT %s" % limit,
query_options)
else:
rows = self.server.exec_query(_QUERY + "LIMIT %s" % limit,
query_options)
if rows:
print("#")
if limit == 1:
print("# Showing the {0} performing index from "
"{1}:".format(idx_type, self.table))
else:
print("# Showing the top {0} {1} performing indexes from "
"{2}:".format(limit, idx_type, self.table))
print("#")
cols = ("database", "table", "name", "column", "sequence",
"num columns", "cardinality", "est. rows", "percent")
print_list(sys.stdout, fmt, cols, rows)
else:
print("# WARNING: Not enough data to calculate "
"best/worst indexes.")
@staticmethod
def __print_index_list(indexes, fmt, no_header=False):
"""Print the list of indexes
indexes[in] list of indexes to print
fmt[in] format out output = sql, table, tab, csv
no_header[in] (optional) if True, do not print the header
"""
if fmt == "sql":
for index in indexes:
index.print_index_sql()
else:
cols = ("database", "table", "name", "type", "columns")
rows = []
for index in indexes:
rows.append(index.get_row())
print_list(sys.stdout, fmt, cols, rows, no_header)
def print_indexes(self, fmt):
"""Print all indexes for this table
fmt[in] format out output = sql, table, tab, csv
"""
print "# Showing indexes from %s:\n#" % (self.table)
if fmt == "sql":
self.__print_index_list(self.btree_indexes, fmt)
self.__print_index_list(self.hash_indexes, fmt, False)
self.__print_index_list(self.rtree_indexes, fmt, False)
self.__print_index_list(self.fulltext_indexes, fmt, False)
else:
master_indexes = []
master_indexes.extend(self.btree_indexes)
master_indexes.extend(self.hash_indexes)
master_indexes.extend(self.rtree_indexes)
master_indexes.extend(self.fulltext_indexes)
self.__print_index_list(master_indexes, fmt)
print "#"
def has_primary_key(self):
"""Check to see if there is a primary key.
Returns bool - True - a primary key was found,
False - no primary key.
"""
primary_key = False
rows = self._get_index_list()
for row in rows:
if row[2] == "PRIMARY":
primary_key = True
return primary_key
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment