Created
July 25, 2013 18:29
-
-
Save nickcoyne/6082437 to your computer and use it in GitHub Desktop.
Allow the cloudinary migration tool to run on Heroku, by using MySQL instead of sqlite.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'cloudinary' | |
require 'cloudinary/migrator' | |
class DbMigrator < Cloudinary::Migrator | |
def self.init | |
return if @@init | |
@@init = true | |
require 'tempfile' | |
end | |
def initialize(options={}) | |
self.class.init | |
@db = ActiveRecord::Base.connection | |
@retrieve = options[:retrieve] | |
@complete = options[:complete] | |
@debug = options[:debug] || false | |
@ignore_duplicates = options[:ignore_duplicates] | |
@threads = [options[:threads] || 10, 100].min | |
@threads = 1 if RUBY_VERSION < "1.9" | |
@extra_options = {:api_key=>options[:api_key], :api_secret=>options[:api_secret]} | |
@delete_after_done = true | |
@max_processing = @threads * 10 | |
@in_process = 0 | |
@work = Queue.new | |
@results = Queue.new | |
@mutex = Mutex.new | |
@db.execute " | |
CREATE TABLE IF NOT EXISTS queue ( | |
id INT PRIMARY KEY AUTO_INCREMENT, | |
internal_id INT, | |
public_id TEXT, | |
url TEXT, | |
metadata TEXT, | |
result VARCHAR(255), | |
status TEXT, | |
updated_at INT | |
) ENGINE=MyISAM; | |
" | |
@db.execute " | |
CREATE INDEX status_idx ON queue ( | |
status(25) | |
); | |
" | |
@db.execute " | |
CREATE UNIQUE INDEX internal_id_idx ON queue ( | |
internal_id | |
); | |
" | |
@db.execute " | |
CREATE UNIQUE INDEX public_id_idx ON queue ( | |
public_id(25) | |
); | |
" | |
if options[:reset_queue] | |
@db.execute("DELETE FROM queue;") | |
end | |
end | |
def drop_table | |
@db.execute("DROP TABLE queue;") | |
end | |
def process(options={}) | |
raise CloudinaryException, "url not given and no retieve callback given" if options[:url].nil? && self.retrieve.nil? | |
raise CloudinaryException, "id not given and retieve or complete callback given" if options[:id].nil? && (!self.retrieve.nil? || !self.complete.nil?) | |
debug("Process: #{options.inspect}") | |
start | |
process_results | |
wait_for_queue | |
options = options.dup | |
id = options.delete(:id) | |
url = options.delete(:url) | |
public_id = options.delete(:public_id) | |
row = { | |
"internal_id"=>id, | |
"url"=>url, | |
"public_id"=>public_id, | |
"metadata"=>options.to_json, | |
"status"=>"processing" | |
} | |
begin | |
insert_row(row) | |
add_to_work_queue(row) | |
# rescue Exception | |
# raise if !@ignore_duplicates | |
end | |
end | |
def done | |
start | |
process_all_pending | |
@terminate = true | |
1.upto(@threads){self.work << nil} # enough objects to release all waiting threads | |
@started = false | |
drop_table | |
@db.close | |
end | |
def max_given_id | |
@db.execute("SELECT MAX(internal_id) FROM queue").first[0] || 0 | |
end | |
private | |
def update_row(row, values) | |
values.merge!("updated_at"=>Time.now.to_i) | |
result = @db.execute("UPDATE queue SET #{values.map{|k,v| "#{k}=#{@db.quote(v)}" }.join(",")} WHERE id=#{row["id"]}") | |
values.each{|key, value| row[key.to_s] = value} | |
row | |
end | |
def insert_row(values) | |
values.merge!("updated_at"=>Time.now.to_i) | |
@db.execute("INSERT INTO queue (#{values.keys.join(",")}) VALUES (#{values.values.map{|k| @db.quote(k) }.join(',')})") | |
values["id"] = @db.execute("SELECT LAST_INSERT_ID();").first[0] | |
end | |
def refill_queue(last_id) | |
@db.execute("SELECT * FROM queue WHERE status IN ('error', 'processing') AND id > #{last_id} LIMIT 10000") do |row| | |
last_id = row["id"] if row["id"] > last_id | |
wait_for_queue | |
add_to_work_queue(row) | |
end | |
last_id | |
end | |
def process_results | |
while self.results.length > 0 | |
row = self.results.pop | |
result = json_decode(row["result"]) | |
debug("Done ID=#{row['internal_id']}, result=#{result.inspect}") | |
complete.call(row["internal_id"], result) if complete | |
if result["error"] | |
status = case result["error"]["http_code"] | |
when 400, 404 then "fatal" # Problematic request. Not a server problem. | |
else "error" | |
end | |
else | |
status = "completed" | |
end | |
updates = {:status=>status, :result=>row["result"]} | |
updates["public_id"] = result["public_id"] if result["public_id"] && !row["public_id"] | |
begin | |
update_row(row, updates) | |
# rescue SQLite3::ConstraintException | |
# updates = {:status=>"error", :result=>{:error=>{:message=>"public_id already exists"}}.to_json} | |
# update_row(row, updates) | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment