Created
April 3, 2011 16:11
-
-
Save nstielau/900525 to your computer and use it in GitHub Desktop.
Task
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Task | |
include Mongoid::Document | |
include Mongoid::Timestamps | |
include Mongoid::Processable | |
processable :timeout => 60, :action => :aggregate!, :query => proc{{:aggregate_at => {'$lte' => Time.now.utc}}} | |
field :aggregate_at, :type => Time | |
def aggregate! | |
self.to_be_aggregated_at = Time.now.utc + 60 | |
# Aggregate | |
save | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Mongoid | |
module Processable | |
extend ActiveSupport::Concern | |
included do | |
@action = :process | |
@timeout = 60 | |
@query = {} | |
field :processable, :type => Hash, :default => {:processable => {:locked_by => nil, :locked_at => nil, :last_error => nil}} | |
end | |
module InstanceMethods | |
def complete | |
collection.master.collection.update({:_id => id, 'processable.locked_by' => self.class.process_id}, { | |
"$set" => {"processable" => { | |
'locked_by' => nil, | |
'locked_at' => nil | |
} | |
}}) | |
reload | |
end | |
# Increase the error count on the locked document and release. Optionally provide an error message. | |
def error(error_message=nil) | |
collection.master.collection.update({:_id => id}, { | |
"$set" => {"processable" => { | |
'last_error' => error_message, | |
'locked_by' => nil, | |
'locked_at' => nil | |
} | |
}}) | |
reload | |
end | |
end | |
module ClassMethods | |
def process_loop | |
self.cleanup_locks! # remove expired locks | |
while(doc = self.lock_next) | |
begin | |
doc.send(@action) if doc.respond_to?(@action) | |
doc.compelete | |
sleep(1) | |
rescue StandardError => e | |
doc.error(e.message) | |
end | |
end | |
end | |
def process_id | |
"#{Socket.gethostname}-#{Process.pid}-#{Thread.current}" | |
end | |
def processable_stats | |
{ | |
:locked => self.where("processable.locked_by" => /.*/).count, | |
:remaining => self.where({"processable.locked_by" => nil}.merge(query)).count, | |
:errored => self.where({"processable.last_error" => /.*/}).count, | |
:total => self.count() | |
} | |
end | |
def lock_next | |
doc = collection.master.collection.find_and_modify( | |
:query => {"processable.locked_by" => nil, "processable.locked_at" => nil}.merge(query), | |
:update => {"$set" => {"processable.locked_by" => process_id, "processable.locked_at" => Time.now.utc}} | |
) | |
self.criteria.id(doc["_id"]).first | |
rescue Mongo::OperationFailure => e | |
nil | |
end | |
# Removes stale locks that have exceeded the timeout | |
def cleanup_locks! | |
collection.master.collection.update({"processable.locked_by" => /.*/, "processable.locked_at" => {'$lt' => Time.now.utc - @timeout}}, | |
{"$set" => {"processable.locked_by" => nil, "processable.locked_at" => nil}}, | |
:multi => true) | |
processable_stats | |
end | |
def cleanup_errors! | |
collection.master.collection.update({"processable.last_error" => /.*/}, | |
{"$set" => {"processable.last_error" => nil}}, | |
:multi => true) | |
processable_stats | |
end | |
def processable(options={}) | |
@query = options[:query] if options[:query] | |
@timeout = options[:timeout].to_i if options[:timeout] | |
@action = options[:action].to_sym if options[:action] | |
end | |
private | |
def query | |
@query.is_a?(Proc) ? @query.call : @query | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment