Last active
December 17, 2015 09:29
-
-
Save boblail/5587579 to your computer and use it in GitHub Desktop.
Exports a MySQL database as XML, walks the XML stream with nokogiri, and outputs well-formed Postgres SQL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# encoding: utf-8 | |
# https://gist.github.com/boblail/5587579 | |
require 'benchmark' | |
require 'nokogiri' | |
require 'progressbar' | |
require 'tempfile' | |
class SqlRocket < Nokogiri::XML::SAX::Document | |
FIELDS_WHERE_NIL_IS_PROHIBITED = %w{created_at updated_at} | |
# SqlRocket command API | |
class << self | |
# Exports a MySQL database, scheme and data, to an XML file. | |
def dump(database_name, output_file=nil) | |
$stderr.print "Dumping #{database_name}...\n" | |
command = "mysqldump --skip-triggers --ignore-table #{database_name}.schema_migrations --xml #{ARGV[2..-1].join(" ")} #{database_name}" | |
command << " > #{output_file}" if output_file | |
ms = Benchmark.realtime do | |
$stderr.print "#{command}\n" | |
# We don't use backticks so that we don't | |
# pipe stdout into Ruby. This way the shell | |
# can redirect it to a file without getting | |
# this script involved! | |
system command | |
end | |
$stderr.puts " \e[32mfinished in #{"%.2f seconds" % ms}\e[0m" | |
end | |
# Converts a MySQL XML dump to INSERT statements | |
# properly formatted for Postgres. | |
def convert(xml_file) | |
file = File.open(xml_file) | |
SqlRocket.new(file).convert($stdout) | |
end | |
# Chains a dump and a convert together. | |
def rocket(database_name) | |
xml_file = Tempfile.new(database_name) | |
dump(database_name, xml_file.path) | |
begin | |
SqlRocket.new(xml_file.open).convert($stdout) | |
ensure | |
xml_file.close | |
xml_file.unlink | |
end | |
end | |
end | |
# SqlRocket convert logic | |
def initialize(io) | |
@io = io | |
@progress = ProgressBar.new("Converting", io.size) | |
@parser = Nokogiri::XML::SAX::Parser.new(self) | |
# @parser.replace_entities = true | |
@schema_by_table = {} | |
@inserts = [] | |
end | |
attr_reader :io, :parser, :progress, :output | |
attr_reader :body, :table_name, :mode, :current_schema, :values, :row_id | |
def convert(output) | |
@start = Time.now | |
@output = output | |
parser.parse(io) | |
progress.finish | |
@schema_by_table.each do |table, schema| | |
next unless schema.key?("id") | |
output.puts "SELECT setval('#{table}_id_seq', (SELECT MAX(id)+1 FROM #{table}), FALSE);" | |
end | |
$stderr.puts "\e[32mFinished in \e[1m#{format_duration(Time.now - @start)}\e[0m" | |
true | |
rescue | |
log_error $!.message | |
puts "", body, "" | |
raise | |
end | |
# Mode | |
def read_structure? | |
mode == :read_structure | |
end | |
def read_data? | |
mode == :read_data | |
end | |
def reset_mode! | |
@rows = 0 | |
@row_id = nil | |
@table_name = nil | |
@mode = :continue | |
end | |
def read_structure! | |
@mode = :read_structure | |
end | |
def read_data! | |
@mode = :read_data | |
end | |
# SAX events | |
def start_document | |
reset_mode! | |
end | |
def start_element(name, attributes={}) | |
attributes = Hash[attributes] unless Hash === attributes | |
@body = nil # we don't care about the body of this element | |
case name | |
when 'table_structure' | |
read_structure! | |
@table_name = attributes['name'] | |
@current_schema = @schema_by_table[table_name] = {} | |
when 'table_data' | |
read_data! | |
@start_ts = Time.now | |
@start_cpu = Process.times | |
@table_name = attributes['name'] | |
@current_schema = @schema_by_table[table_name] # readonly | |
@rows = 0 | |
when 'row' | |
@row_id = nil | |
@values = [] | |
when 'field' | |
add_field_to_table_structure(attributes) if read_structure? | |
if read_data? | |
@body = "" # we _do_ care about the body of this element | |
@attributes = attributes # save for use in end_element | |
end | |
end | |
end | |
def end_element(name) | |
progress.set io.pos | |
case name | |
when 'table_structure' | |
reset_mode! | |
when 'table_data' | |
output.print ");" if @rows > 0 | |
log_time_to_read_table | |
reset_mode! | |
when 'row' | |
@rows += 1 | |
if @rows == 1 | |
output.print "INSERT INTO \"#{table_name}\" (#{current_schema.keys.join(",")}) VALUES (" | |
else | |
output.print "," | |
end | |
output.print "(#{values.join(",")})" | |
when 'field' | |
if read_data? | |
@row_id = body.chomp if @attributes['name'] == 'id' | |
add_value_to_table_row(@attributes, @body) | |
end | |
end | |
end | |
def error(string) | |
log_error string | |
end | |
def warning(string) | |
log_warning string | |
end | |
def characters(string) | |
@body << string unless @body.nil? | |
end | |
# Element transformation | |
def add_field_to_table_structure(attributes) | |
current_schema[attributes['Field']] = attributes['Type'] | |
end | |
def add_value_to_table_row(attributes, body) | |
type = current_schema[attributes['name']] | |
@values << output_postgres_value(type, attributes, body) | |
end | |
def output_postgres_value(type, attributes, body) | |
field = attributes['name'] | |
value = read_value_of(type, attributes, body) | |
validate_value!(field, value, type) | |
return output_nil if value.nil? | |
return output_postgres_boolean(value) if type == "tinyint(1)" | |
return value if type =~ /^int/ | |
return output_quoted(value) if type =~ /^(decimal|varchar|text|date|datetime|time)/ | |
raise NotImplementedError, "Type \"#{type}\" (of field \"#{field}\") is unhandled" | |
end | |
def read_value_of(type, attributes, value) | |
return nil if attributes['xsi:nil'] == "true" | |
coerce_value(value, type) | |
end | |
def coerce_value(value, type) | |
return nil if type == "datetime" && value == "0000-00-00 00:00:00" | |
value | |
end | |
def validate_value!(field, value, type) | |
if FIELDS_WHERE_NIL_IS_PROHIBITED.member?(field) && value.nil? | |
log_error "NULL value is not allowed for #{table_name}.#{field}" | |
end | |
end | |
# Postgres SQL formatting statements | |
def output_nil | |
"NULL" | |
end | |
def output_postgres_boolean(value) | |
output_quoted(value == '1' ? 't' : 'f') | |
end | |
def output_quoted(value) | |
escaped = value | |
.gsub(/'/, "''") | |
.gsub(/\r\n/, "\n") | |
.gsub(/\\n\\\n/, "\\n") | |
.gsub(/\\xE2\\x80\\x99/, "’") | |
"'#{escaped}'" | |
end | |
# Logging | |
def log_error(message) | |
log_with_tags "\e[31mERROR \e[0m#{message}" | |
end | |
def log_warning(message) | |
log_with_tags "\e[33mWARNING \e[0m#{message}" | |
end | |
def log_with_tags(message) | |
$stderr.print message.chomp | |
hash = {} | |
hash["table"] = table_name if table_name | |
hash["id"] = row_id if row_id | |
hash["pos"] = io.pos | |
hash["time"] = "%.2f" % (Time.now - @start) | |
$stderr.print " {" | |
$stderr.print hash.map { |key, value| "#{key}: #{value}" }.join(", ") | |
$stderr.print "}\n" | |
end | |
def log_time_to_read_table | |
end_ts = Time.now | |
end_cpu = Process.times | |
real_seconds = end_ts - @start_ts | |
user_seconds = end_cpu.utime - @start_cpu.utime | |
kernel_seconds = end_cpu.stime - @start_cpu.stime | |
$stderr.print "\e[32m\e[1m#{@rows}\e[0m\e[32m inserts" | |
$stderr.print " for \e[1m#{table_name}\e[0m" | |
$stderr.print "\e[32m in \e[1m#{format_duration(real_seconds)}\e[0m" | |
$stderr.print "\e[32m {user: %.2f, kernel: %.2f}\e[0m" % [user_seconds, kernel_seconds] | |
$stderr.print "\n" | |
end | |
def format_duration(seconds) | |
hours = (seconds / 3600).to_i | |
minutes = (seconds / 60).round | |
if hours > 0 | |
minutes -= (hours * 60) | |
"#{hours} hours and #{minutes} minutes" | |
elsif minutes > 0 | |
"#{minutes} minutes" | |
else | |
"%.2f seconds" % seconds | |
end | |
end | |
end | |
# CLI | |
command, arg = ARGV | |
case command | |
when "dump"; SqlRocket.dump arg | |
when "convert"; SqlRocket.convert arg | |
when "rocket"; SqlRocket.rocket arg | |
else puts "Uh... Don't know #{command}. Please say dump, convert or rocket!" | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment