Skip to content

Instantly share code, notes, and snippets.

@boblail
Last active December 17, 2015 09:29
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save boblail/5587579 to your computer and use it in GitHub Desktop.
Save boblail/5587579 to your computer and use it in GitHub Desktop.
Exports a MySQL database as XML, walks the XML stream with nokogiri, and outputs well-formed Postgres SQL
#!/usr/bin/env ruby
# encoding: utf-8
# https://gist.github.com/boblail/5587579
require 'benchmark'
require 'nokogiri'
require 'progressbar'
require 'tempfile'
class SqlRocket < Nokogiri::XML::SAX::Document
FIELDS_WHERE_NIL_IS_PROHIBITED = %w{created_at updated_at}
# SqlRocket command API
class << self
# Exports a MySQL database, scheme and data, to an XML file.
def dump(database_name, output_file=nil)
$stderr.print "Dumping #{database_name}...\n"
command = "mysqldump --skip-triggers --ignore-table #{database_name}.schema_migrations --xml #{ARGV[2..-1].join(" ")} #{database_name}"
command << " > #{output_file}" if output_file
ms = Benchmark.realtime do
$stderr.print "#{command}\n"
# We don't use backticks so that we don't
# pipe stdout into Ruby. This way the shell
# can redirect it to a file without getting
# this script involved!
system command
end
$stderr.puts " \e[32mfinished in #{"%.2f seconds" % ms}\e[0m"
end
# Converts a MySQL XML dump to INSERT statements
# properly formatted for Postgres.
def convert(xml_file)
file = File.open(xml_file)
SqlRocket.new(file).convert($stdout)
end
# Chains a dump and a convert together.
def rocket(database_name)
xml_file = Tempfile.new(database_name)
dump(database_name, xml_file.path)
begin
SqlRocket.new(xml_file.open).convert($stdout)
ensure
xml_file.close
xml_file.unlink
end
end
end
# SqlRocket convert logic
def initialize(io)
@io = io
@progress = ProgressBar.new("Converting", io.size)
@parser = Nokogiri::XML::SAX::Parser.new(self)
# @parser.replace_entities = true
@schema_by_table = {}
@inserts = []
end
attr_reader :io, :parser, :progress, :output
attr_reader :body, :table_name, :mode, :current_schema, :values, :row_id
def convert(output)
@start = Time.now
@output = output
parser.parse(io)
progress.finish
@schema_by_table.each do |table, schema|
next unless schema.key?("id")
output.puts "SELECT setval('#{table}_id_seq', (SELECT MAX(id)+1 FROM #{table}), FALSE);"
end
$stderr.puts "\e[32mFinished in \e[1m#{format_duration(Time.now - @start)}\e[0m"
true
rescue
log_error $!.message
puts "", body, ""
raise
end
# Mode
def read_structure?
mode == :read_structure
end
def read_data?
mode == :read_data
end
def reset_mode!
@rows = 0
@row_id = nil
@table_name = nil
@mode = :continue
end
def read_structure!
@mode = :read_structure
end
def read_data!
@mode = :read_data
end
# SAX events
def start_document
reset_mode!
end
def start_element(name, attributes={})
attributes = Hash[attributes] unless Hash === attributes
@body = nil # we don't care about the body of this element
case name
when 'table_structure'
read_structure!
@table_name = attributes['name']
@current_schema = @schema_by_table[table_name] = {}
when 'table_data'
read_data!
@start_ts = Time.now
@start_cpu = Process.times
@table_name = attributes['name']
@current_schema = @schema_by_table[table_name] # readonly
@rows = 0
when 'row'
@row_id = nil
@values = []
when 'field'
add_field_to_table_structure(attributes) if read_structure?
if read_data?
@body = "" # we _do_ care about the body of this element
@attributes = attributes # save for use in end_element
end
end
end
def end_element(name)
progress.set io.pos
case name
when 'table_structure'
reset_mode!
when 'table_data'
output.print ");" if @rows > 0
log_time_to_read_table
reset_mode!
when 'row'
@rows += 1
if @rows == 1
output.print "INSERT INTO \"#{table_name}\" (#{current_schema.keys.join(",")}) VALUES ("
else
output.print ","
end
output.print "(#{values.join(",")})"
when 'field'
if read_data?
@row_id = body.chomp if @attributes['name'] == 'id'
add_value_to_table_row(@attributes, @body)
end
end
end
def error(string)
log_error string
end
def warning(string)
log_warning string
end
def characters(string)
@body << string unless @body.nil?
end
# Element transformation
def add_field_to_table_structure(attributes)
current_schema[attributes['Field']] = attributes['Type']
end
def add_value_to_table_row(attributes, body)
type = current_schema[attributes['name']]
@values << output_postgres_value(type, attributes, body)
end
def output_postgres_value(type, attributes, body)
field = attributes['name']
value = read_value_of(type, attributes, body)
validate_value!(field, value, type)
return output_nil if value.nil?
return output_postgres_boolean(value) if type == "tinyint(1)"
return value if type =~ /^int/
return output_quoted(value) if type =~ /^(decimal|varchar|text|date|datetime|time)/
raise NotImplementedError, "Type \"#{type}\" (of field \"#{field}\") is unhandled"
end
def read_value_of(type, attributes, value)
return nil if attributes['xsi:nil'] == "true"
coerce_value(value, type)
end
def coerce_value(value, type)
return nil if type == "datetime" && value == "0000-00-00 00:00:00"
value
end
def validate_value!(field, value, type)
if FIELDS_WHERE_NIL_IS_PROHIBITED.member?(field) && value.nil?
log_error "NULL value is not allowed for #{table_name}.#{field}"
end
end
# Postgres SQL formatting statements
def output_nil
"NULL"
end
def output_postgres_boolean(value)
output_quoted(value == '1' ? 't' : 'f')
end
def output_quoted(value)
escaped = value
.gsub(/'/, "''")
.gsub(/\r\n/, "\n")
.gsub(/\\n\\\n/, "\\n")
.gsub(/\\xE2\\x80\\x99/, "’")
"'#{escaped}'"
end
# Logging
def log_error(message)
log_with_tags "\e[31mERROR \e[0m#{message}"
end
def log_warning(message)
log_with_tags "\e[33mWARNING \e[0m#{message}"
end
def log_with_tags(message)
$stderr.print message.chomp
hash = {}
hash["table"] = table_name if table_name
hash["id"] = row_id if row_id
hash["pos"] = io.pos
hash["time"] = "%.2f" % (Time.now - @start)
$stderr.print " {"
$stderr.print hash.map { |key, value| "#{key}: #{value}" }.join(", ")
$stderr.print "}\n"
end
def log_time_to_read_table
end_ts = Time.now
end_cpu = Process.times
real_seconds = end_ts - @start_ts
user_seconds = end_cpu.utime - @start_cpu.utime
kernel_seconds = end_cpu.stime - @start_cpu.stime
$stderr.print "\e[32m\e[1m#{@rows}\e[0m\e[32m inserts"
$stderr.print " for \e[1m#{table_name}\e[0m"
$stderr.print "\e[32m in \e[1m#{format_duration(real_seconds)}\e[0m"
$stderr.print "\e[32m {user: %.2f, kernel: %.2f}\e[0m" % [user_seconds, kernel_seconds]
$stderr.print "\n"
end
def format_duration(seconds)
hours = (seconds / 3600).to_i
minutes = (seconds / 60).round
if hours > 0
minutes -= (hours * 60)
"#{hours} hours and #{minutes} minutes"
elsif minutes > 0
"#{minutes} minutes"
else
"%.2f seconds" % seconds
end
end
end
# CLI
command, arg = ARGV
case command
when "dump"; SqlRocket.dump arg
when "convert"; SqlRocket.convert arg
when "rocket"; SqlRocket.rocket arg
else puts "Uh... Don't know #{command}. Please say dump, convert or rocket!"
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment