Skip to content

Instantly share code, notes, and snippets.

@veer66
Last active September 4, 2015 02:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save veer66/62cee91eaacfbd661b9b to your computer and use it in GitHub Desktop.
Save veer66/62cee91eaacfbd661b9b to your computer and use it in GitHub Desktop.
A script for converting MySQL Workbench to SQLAlchemy classes.
#
# mysqlworkbench_to_sqlalchemy.rb
#
# Copyright (c) 2015 Vee Satayamas
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
require 'zip'
require 'rexml/document'
include REXML
def convert(mwb_path)
Zip::File.open(mwb_path) do |zip_file|
zip_file.each do |entry|
if entry.name == 'document.mwb.xml'
xml = entry.get_input_stream.read
convert_xml(xml)
break
end
end
end
end
def convert_xml(xml)
doc = Document.new(xml)
root = doc.root
entities = []
root.each_element('//value[@struct-name="db.mysql.Table"]') do |tab|
name_el = tab.get_elements('value[@key="name"]')[0]
name = name_el.text
columns = extract_columns(tab)
entities << {name:name, columns:columns}
end
idx = extract_indice(root)
links = find_links(entities, root)
gen_entities(entities, idx, links)
end
def find_links(entities, root)
col_id_idx = {}
entities.each do |entry|
entry[:columns].each do |col|
col_id_idx[col[:id]] = {entry: entry, column:col}
end
end
key_tab = {}
root.each_element('//value[@struct-name="db.mysql.ForeignKey"]') do |link_el|
col_link_el = link_el.get_elements('value[@key="columns"]/link')[0]
ref_el = link_el.get_elements('value[@key="referencedColumns"]/link')[0]
ref_id = ref_el.text
col_link_id = col_link_el.text
key_tab[col_link_id] = col_id_idx[ref_id] if col_link_id and ref_id
end
return key_tab
end
def extract_indice(root)
primary_keys = {}
root.each_element('//value[@struct-name="db.mysql.Index"]') do |idx_el|
ref_el = idx_el.get_elements('value/value/link[@key="referencedColumn"]')[0]
is_primary_el = idx_el.get_elements('value[@key="isPrimary"]')[0]
if ref_el and is_primary_el
ref_id = ref_el.text
primary_keys[ref_id] = true if is_primary_el.text == "1"
end
end
{primary_keys: primary_keys}
end
def gen_entities(entities, idx, links)
entities.each do |entity|
gen_entity(entity, idx, links)
end
end
def gen_entity(entity, idx, links)
name = entity[:name]
puts "class #{name}(Base):"
puts " __tablename__ = \"#{name}\""
gen_columns(entity[:columns], idx, links)
puts
end
def gen_columns(columns, idx, links)
columns.each{|col| gen_column(col, idx, links)}
end
$type_tab = {"int" => "Integer",
"decimal" => "Integer",
"bool" => "Boolean",
"text" => "Text",
"varchar" => "String",
"datetime_f" => "DateTime",
"float" => "Float"}
def gen_column(col, idx, links)
name = col[:name]
type = $type_tab[col[:type]]
raise "Type: #{col[:type]} is not supported" unless type
add = ""
primary_keys = idx[:primary_keys]
col_id = col[:id]
is_primary_key = primary_keys.has_key?(col_id)
add += " ,primary_key=True" if is_primary_key
link = links[col_id]
add += " ,ForeignKey('#{link[:entry][:name]}.#{link[:column][:name]}')" if link
puts " #{name} = Column(#{type}#{add})"
end
def extract_columns(tab)
column_list = tab.get_elements('value[@content-struct-name="db.mysql.Column"]')[0]
column_list.get_elements('value[@struct-name="db.mysql.Column"]').map do |col_el|
id = col_el.attribute("id").value
name = col_el.get_elements('value[@key="name"]')[0].text
type_el = col_el.get_elements('link[@key="simpleType"]')[0]
type_el = col_el.get_elements('link[@key="userType"]')[0] unless type_el
type = type_el.text.split('.')[-1]
{id: id, name: name, type: type}
end
end
def main
if ARGV.length != 1
puts "Usage: ruby #{$0} <mwb_path>"
exit 1
end
mwb_path = ARGV[0]
convert(mwb_path)
end
main if $0 == __FILE__
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment