Skip to content

Instantly share code, notes, and snippets.

@nickcoyne
Created July 25, 2013 18:29
Show Gist options
  • Save nickcoyne/6082437 to your computer and use it in GitHub Desktop.
Save nickcoyne/6082437 to your computer and use it in GitHub Desktop.
Allow the cloudinary migration tool to run on Heroku, by using MySQL instead of sqlite.
require 'cloudinary'
require 'cloudinary/migrator'
class DbMigrator < Cloudinary::Migrator
def self.init
return if @@init
@@init = true
require 'tempfile'
end
def initialize(options={})
self.class.init
@db = ActiveRecord::Base.connection
@retrieve = options[:retrieve]
@complete = options[:complete]
@debug = options[:debug] || false
@ignore_duplicates = options[:ignore_duplicates]
@threads = [options[:threads] || 10, 100].min
@threads = 1 if RUBY_VERSION < "1.9"
@extra_options = {:api_key=>options[:api_key], :api_secret=>options[:api_secret]}
@delete_after_done = true
@max_processing = @threads * 10
@in_process = 0
@work = Queue.new
@results = Queue.new
@mutex = Mutex.new
@db.execute "
CREATE TABLE IF NOT EXISTS queue (
id INT PRIMARY KEY AUTO_INCREMENT,
internal_id INT,
public_id TEXT,
url TEXT,
metadata TEXT,
result VARCHAR(255),
status TEXT,
updated_at INT
) ENGINE=MyISAM;
"
@db.execute "
CREATE INDEX status_idx ON queue (
status(25)
);
"
@db.execute "
CREATE UNIQUE INDEX internal_id_idx ON queue (
internal_id
);
"
@db.execute "
CREATE UNIQUE INDEX public_id_idx ON queue (
public_id(25)
);
"
if options[:reset_queue]
@db.execute("DELETE FROM queue;")
end
end
def drop_table
@db.execute("DROP TABLE queue;")
end
def process(options={})
raise CloudinaryException, "url not given and no retieve callback given" if options[:url].nil? && self.retrieve.nil?
raise CloudinaryException, "id not given and retieve or complete callback given" if options[:id].nil? && (!self.retrieve.nil? || !self.complete.nil?)
debug("Process: #{options.inspect}")
start
process_results
wait_for_queue
options = options.dup
id = options.delete(:id)
url = options.delete(:url)
public_id = options.delete(:public_id)
row = {
"internal_id"=>id,
"url"=>url,
"public_id"=>public_id,
"metadata"=>options.to_json,
"status"=>"processing"
}
begin
insert_row(row)
add_to_work_queue(row)
# rescue Exception
# raise if !@ignore_duplicates
end
end
def done
start
process_all_pending
@terminate = true
1.upto(@threads){self.work << nil} # enough objects to release all waiting threads
@started = false
drop_table
@db.close
end
def max_given_id
@db.execute("SELECT MAX(internal_id) FROM queue").first[0] || 0
end
private
def update_row(row, values)
values.merge!("updated_at"=>Time.now.to_i)
result = @db.execute("UPDATE queue SET #{values.map{|k,v| "#{k}=#{@db.quote(v)}" }.join(",")} WHERE id=#{row["id"]}")
values.each{|key, value| row[key.to_s] = value}
row
end
def insert_row(values)
values.merge!("updated_at"=>Time.now.to_i)
@db.execute("INSERT INTO queue (#{values.keys.join(",")}) VALUES (#{values.values.map{|k| @db.quote(k) }.join(',')})")
values["id"] = @db.execute("SELECT LAST_INSERT_ID();").first[0]
end
def refill_queue(last_id)
@db.execute("SELECT * FROM queue WHERE status IN ('error', 'processing') AND id > #{last_id} LIMIT 10000") do |row|
last_id = row["id"] if row["id"] > last_id
wait_for_queue
add_to_work_queue(row)
end
last_id
end
def process_results
while self.results.length > 0
row = self.results.pop
result = json_decode(row["result"])
debug("Done ID=#{row['internal_id']}, result=#{result.inspect}")
complete.call(row["internal_id"], result) if complete
if result["error"]
status = case result["error"]["http_code"]
when 400, 404 then "fatal" # Problematic request. Not a server problem.
else "error"
end
else
status = "completed"
end
updates = {:status=>status, :result=>row["result"]}
updates["public_id"] = result["public_id"] if result["public_id"] && !row["public_id"]
begin
update_row(row, updates)
# rescue SQLite3::ConstraintException
# updates = {:status=>"error", :result=>{:error=>{:message=>"public_id already exists"}}.to_json}
# update_row(row, updates)
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment