Skip to content

Instantly share code, notes, and snippets.

@shun0102
Last active August 29, 2015 14:14
Show Gist options
  • Save shun0102/50305f103438d790b04f to your computer and use it in GitHub Desktop.
Save shun0102/50305f103438d790b04f to your computer and use it in GitHub Desktop.
embulk mysql input plugin example
module Embulk
class InputMysql < InputPlugin
require 'jdbc/mysql'
Jdbc::MySQL.load_driver
Plugin.register_input('mysql', self)
def self.transaction(config, &control)
task = {
'host' => config.param('host', :string, :default => 'localhost'),
'port' => config.param('port', :long, :default => 3306),
'database' => config.param('database', :string, :default => 0),
'table' => config.param('table', :string, :default => ''),
'user' => config.param('user', :string, :default => 'root'),
'password' => config.param('password', :string, :default => ''),
'columns' => config.param('columns', :array, :default => [])
}
threads = config.param('threads', :integer, default: 1)
columns = []
task['columns'].each_with_index do |col, index|
columns << Column.new(index, col['name'], col['type'].to_sym)
end
puts "MySQL input started."
commit_reports = yield(task, columns, threads)
puts "MySQL input finished. Commit reports = #{commit_reports.to_json}"
return {}
end
def connect(task)
url = "jdbc:mysql://#{task['host']}:#{task['port']}/#{task['database']}"
props = java.util.Properties.new
props.put("user", task['user'])
props.put("password", task['password'])
mysql = com.mysql.jdbc.Driver.new.connect(url, props)
if block_given?
begin
yield mysql
ensure
mysql.close
end
end
mysql
end
def format(value, type)
if value.nil?
return nil
elsif type == "timestamp"
unix_time = value.getTime / 1000
return Time.at(unix_time)
else
return value
end
end
def run
connect(@task) do |client|
columns = @task['columns'].map{|i| "`#{i['name']}`" }.join(",")
query = "select #{columns} from #{@task['table']}"
stmt = client.create_statement
begin
res = stmt.execute_query(query)
while (res.next) do
row = []
@task['columns'].each do |col|
row << format(res.getObject(col['name']), col['type'])
end
p row
@page_builder.add(row)
end
ensure
@page_builder.finish
stmt.close
end
end
commit_report = {}
return commit_report
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment