Created
February 18, 2012 23:53
-
-
Save ecavazos/1861350 to your computer and use it in GitHub Desktop.
Schedule items to be processed (based on redis sorted set)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require 'eventmachine' | |
| require 'sorted_set_adapter' | |
| class BadJob | |
| def set | |
| @set ||= SortedSetAdapter.new | |
| end | |
| def process_item | |
| return unless item = set.pop | |
| puts "Processing #{ item } @ #{ Time.now }" | |
| sleep 0.2 | |
| puts 'fail!' | |
| # does NOT add item back to set because of failure | |
| # set.add Time.now.to_i, item | |
| end | |
| def perform | |
| EM.run do | |
| EM.add_periodic_timer(2) { process_item } | |
| end | |
| end | |
| end | |
| BadJob.new.perform |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require 'eventmachine' | |
| require 'sorted_set_adapter' | |
| class Job | |
| def set | |
| @set ||= SortedSetAdapter.new | |
| end | |
| def process_item | |
| return unless item = set.pop | |
| puts "Processing #{ item } @ #{ Time.now }" | |
| sleep 0.2 | |
| set.add Time.now.to_i, item | |
| end | |
| def perform | |
| EM.run do | |
| EM.add_periodic_timer(0.5) { process_item } | |
| end | |
| end | |
| end | |
| Job.new.perform |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # * scheduler will maintain an sorted set of items to be processed | |
| # * scheduler will try to add all items to the set in case a processor | |
| # crashes and does not add the item back | |
| # * job will remove member from set | |
| # * job will add item back to set when finished | |
| # | |
| require 'eventmachine' | |
| require 'sorted_set_adapter' | |
| class Scheduler | |
| @set_name = :scheduler | |
| class << self | |
| attr :set_name | |
| end | |
| def items | |
| @items ||= Array.new(40) { |i| { :member => i + 1, :score => Time.now.to_i + i } } | |
| end | |
| def set | |
| @set ||= SortedSetAdapter.new | |
| end | |
| def build_set | |
| items.each do |item| | |
| set.add item[:score], item[:member] | |
| end | |
| puts "#{ items.count } items added to set." | |
| end | |
| def refresh_members | |
| items.each do |item| | |
| unless set.rank item[:member] | |
| set.add item[:score], item[:member] | |
| puts "added #{ item[:member] } back to set." | |
| end | |
| end | |
| status | |
| end | |
| def status | |
| puts "#{ set.count } members in set" | |
| puts "next 5 items to be processed: #{ set.top(5).join ', ' }" | |
| end | |
| def set_traps | |
| %w(INT TERM QUIT).each do |sig| | |
| trap(sig) { EM.stop } | |
| end | |
| end | |
| def flush | |
| set.flush | |
| end | |
| def run | |
| EM.run do | |
| EM.add_periodic_timer(2) { status } | |
| EM.add_periodic_timer(20) { refresh_members } | |
| end | |
| end | |
| def start | |
| flush | |
| set_traps | |
| build_set | |
| run | |
| end | |
| end | |
| Scheduler.new.start |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require 'redis' | |
| class SortedSetAdapter | |
| @key = :scheduler | |
| class << self | |
| attr :key | |
| end | |
| def redis | |
| @redis ||= Redis.new | |
| end | |
| def add score, member | |
| redis.zadd self.class.key, score, member | |
| end | |
| def pop | |
| member = redis.zrange(self.class.key, 0, 0).first | |
| # try to remove the first member from the set so it doesn't | |
| # get processed by another job. If it's already been | |
| # removed then try and get the next member | |
| if redis.zrem(self.class.key, member) | |
| member | |
| else | |
| # get the next member | |
| pop | |
| end | |
| end | |
| def count | |
| redis.zcard self.class.key | |
| end | |
| def flush | |
| redis.zremrangebyrank self.class.key, 0, -1 | |
| end | |
| def top size | |
| redis.zrange self.class.key, 0, size - 1 | |
| end | |
| def rank member | |
| redis.zrank self.class.key, member | |
| end | |
| def score member | |
| redis.zscore self.class.key, member | |
| end | |
| def all | |
| redis.zrange self.class.key, 0, -1 | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment