Skip to content

Instantly share code, notes, and snippets.

@nstielau
Created April 3, 2011 16:11
Show Gist options
  • Save nstielau/900525 to your computer and use it in GitHub Desktop.
Save nstielau/900525 to your computer and use it in GitHub Desktop.
Task
class Task
include Mongoid::Document
include Mongoid::Timestamps
include Mongoid::Processable
processable :timeout => 60, :action => :aggregate!, :query => proc{{:aggregate_at => {'$lte' => Time.now.utc}}}
field :aggregate_at, :type => Time
def aggregate!
self.to_be_aggregated_at = Time.now.utc + 60
# Aggregate
save
end
end
module Mongoid
module Processable
extend ActiveSupport::Concern
included do
@action = :process
@timeout = 60
@query = {}
field :processable, :type => Hash, :default => {:processable => {:locked_by => nil, :locked_at => nil, :last_error => nil}}
end
module InstanceMethods
def complete
collection.master.collection.update({:_id => id, 'processable.locked_by' => self.class.process_id}, {
"$set" => {"processable" => {
'locked_by' => nil,
'locked_at' => nil
}
}})
reload
end
# Increase the error count on the locked document and release. Optionally provide an error message.
def error(error_message=nil)
collection.master.collection.update({:_id => id}, {
"$set" => {"processable" => {
'last_error' => error_message,
'locked_by' => nil,
'locked_at' => nil
}
}})
reload
end
end
module ClassMethods
def process_loop
self.cleanup_locks! # remove expired locks
while(doc = self.lock_next)
begin
doc.send(@action) if doc.respond_to?(@action)
doc.compelete
sleep(1)
rescue StandardError => e
doc.error(e.message)
end
end
end
def process_id
"#{Socket.gethostname}-#{Process.pid}-#{Thread.current}"
end
def processable_stats
{
:locked => self.where("processable.locked_by" => /.*/).count,
:remaining => self.where({"processable.locked_by" => nil}.merge(query)).count,
:errored => self.where({"processable.last_error" => /.*/}).count,
:total => self.count()
}
end
def lock_next
doc = collection.master.collection.find_and_modify(
:query => {"processable.locked_by" => nil, "processable.locked_at" => nil}.merge(query),
:update => {"$set" => {"processable.locked_by" => process_id, "processable.locked_at" => Time.now.utc}}
)
self.criteria.id(doc["_id"]).first
rescue Mongo::OperationFailure => e
nil
end
# Removes stale locks that have exceeded the timeout
def cleanup_locks!
collection.master.collection.update({"processable.locked_by" => /.*/, "processable.locked_at" => {'$lt' => Time.now.utc - @timeout}},
{"$set" => {"processable.locked_by" => nil, "processable.locked_at" => nil}},
:multi => true)
processable_stats
end
def cleanup_errors!
collection.master.collection.update({"processable.last_error" => /.*/},
{"$set" => {"processable.last_error" => nil}},
:multi => true)
processable_stats
end
def processable(options={})
@query = options[:query] if options[:query]
@timeout = options[:timeout].to_i if options[:timeout]
@action = options[:action].to_sym if options[:action]
end
private
def query
@query.is_a?(Proc) ? @query.call : @query
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment