Skip to content

Instantly share code, notes, and snippets.

@ecavazos
Created February 18, 2012 23:53
Show Gist options
  • Save ecavazos/1861350 to your computer and use it in GitHub Desktop.
Save ecavazos/1861350 to your computer and use it in GitHub Desktop.
Schedule items to be processed (based on redis sorted set)
require 'eventmachine'
require 'sorted_set_adapter'
class BadJob
def set
@set ||= SortedSetAdapter.new
end
def process_item
return unless item = set.pop
puts "Processing #{ item } @ #{ Time.now }"
sleep 0.2
puts 'fail!'
# does NOT add item back to set because of failure
# set.add Time.now.to_i, item
end
def perform
EM.run do
EM.add_periodic_timer(2) { process_item }
end
end
end
BadJob.new.perform
require 'eventmachine'
require 'sorted_set_adapter'
class Job
def set
@set ||= SortedSetAdapter.new
end
def process_item
return unless item = set.pop
puts "Processing #{ item } @ #{ Time.now }"
sleep 0.2
set.add Time.now.to_i, item
end
def perform
EM.run do
EM.add_periodic_timer(0.5) { process_item }
end
end
end
Job.new.perform
# * scheduler will maintain an sorted set of items to be processed
# * scheduler will try to add all items to the set in case a processor
# crashes and does not add the item back
# * job will remove member from set
# * job will add item back to set when finished
#
require 'eventmachine'
require 'sorted_set_adapter'
class Scheduler
@set_name = :scheduler
class << self
attr :set_name
end
def items
@items ||= Array.new(40) { |i| { :member => i + 1, :score => Time.now.to_i + i } }
end
def set
@set ||= SortedSetAdapter.new
end
def build_set
items.each do |item|
set.add item[:score], item[:member]
end
puts "#{ items.count } items added to set."
end
def refresh_members
items.each do |item|
unless set.rank item[:member]
set.add item[:score], item[:member]
puts "added #{ item[:member] } back to set."
end
end
status
end
def status
puts "#{ set.count } members in set"
puts "next 5 items to be processed: #{ set.top(5).join ', ' }"
end
def set_traps
%w(INT TERM QUIT).each do |sig|
trap(sig) { EM.stop }
end
end
def flush
set.flush
end
def run
EM.run do
EM.add_periodic_timer(2) { status }
EM.add_periodic_timer(20) { refresh_members }
end
end
def start
flush
set_traps
build_set
run
end
end
Scheduler.new.start
require 'redis'
class SortedSetAdapter
@key = :scheduler
class << self
attr :key
end
def redis
@redis ||= Redis.new
end
def add score, member
redis.zadd self.class.key, score, member
end
def pop
member = redis.zrange(self.class.key, 0, 0).first
# try to remove the first member from the set so it doesn't
# get processed by another job. If it's already been
# removed then try and get the next member
if redis.zrem(self.class.key, member)
member
else
# get the next member
pop
end
end
def count
redis.zcard self.class.key
end
def flush
redis.zremrangebyrank self.class.key, 0, -1
end
def top size
redis.zrange self.class.key, 0, size - 1
end
def rank member
redis.zrank self.class.key, member
end
def score member
redis.zscore self.class.key, member
end
def all
redis.zrange self.class.key, 0, -1
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment