Skip to content

Instantly share code, notes, and snippets.

@tooky
Created October 20, 2008 12:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tooky/18063 to your computer and use it in GitHub Desktop.
Save tooky/18063 to your computer and use it in GitHub Desktop.
workling.rb
Merb::BootLoader.after_app_loads do
config_path = Merb.root / 'config' / 'starling.yml'
load_path = Merb.root / 'app' / 'workers'
if File.exists?(config_path)
Workling::Client.options = YAML.load_file(config_path)[Merb.environment.to_sym]
Dir.glob(load_path / '**' / '*.rb').each { |wling| require wling }
else
Merb.logger.error! "No starling.yml file found in #{Merb.root}/config."
exit(1)
end
end
:development:
:host: 127.0.0.1
:port: 22122
:test:
:host: 127.0.0.1
:port: 22122
:production:
:host: 127.0.0.1
:port: 22122
module Workling
class WorklingError < StandardError; end
class WorklingNotFoundError < WorklingError; end
class Base
cattr_accessor :discovered
@@discovered = []
def self.client
@@client ||= Client.new
end
def self.queue_for(klass, method)
(klass.to_s.tableize / method ).split("/").join("__") # Don't split with : because it messes up memcache stats
end
def self.method_name(queue)
queue.split("__").last
end
def self.inherited(subclass)
discovered << subclass
end
def self.run(klass, method, options = {})
client.set(queue_for(klass, method), options)
return nil # empty.
rescue MemCache::MemCacheError => e
# failed to enqueue, raise a workling error so that it propagates upwards
raise Workling::WorklingError.new("#{e.class.to_s} - #{e.message}")
end
def self.method_missing(method, *args, &block)
if method.to_s =~ /^async?_(.*)/
run(self.to_s.dasherize, $1, *args)
else
super
end
end
def self.queues
@queues ||= instance_methods(false).inject({}) do |queues, method|
next if method == 'create' # Skip the create method
queue = queue_for(self, method)
queues[queue] = self.new
queues
end
end
def create
# Put worker initialize code in here. This is good for restarting jobs that
# were interrupted.
end
end
class Client
cattr_accessor :options
attr_accessor :starling_url
attr_accessor :connection
def initialize
@starling_url = [self.class.options[:host], self.class.options[:port]].compact.join(':')
options = [self.starling_url, self.class.options[:memcache_options]].compact
@connection = ::MemCache.new(*options)
end
def method_missing(method, *args)
@connection.send(method, *args)
end
def stats
@connection.stats
end
end
class Poller
cattr_accessor :sleep_time # Seconds to sleep before looping
cattr_accessor :reset_time # Seconds to wait while resetting connection
def initialize
Poller.sleep_time = Workling::Client.options[:sleep_time] || 2
Poller.reset_time = Workling::Client.options[:reset_time] || 30
@workers = ThreadGroup.new
end
def logger
Merb.logger
end
def listen
# Create a thread for each worker.
Workling::Base.discovered.each do |klass|
@workers.add(Thread.new(klass) { |c| klass_listen(c) })
end
# Wait for all workers to complete
@workers.list.each { |t| t.join }
end
# gracefully stop processing
def stop
@workers.list.each { |w| w[:shutdown] = true }
end
##
## Thread procs
##
# Listen for one worker class
def klass_listen(klass)
logger.debug("Listener thread #{klass.name} started")
# Read thread configuration if available
if Client.options.has_key?(:listeners)
if Client.options[:listeners].has_key?(klass.to_s)
options = Client.options[:listeners][klass.to_s].symbolize_keys
thread_sleep_time = options[:sleep_time] if options.has_key?(:sleep_time)
end
end
thread_sleep_time ||= self.class.sleep_time
# Setup connection to starling (one per thread)
connection = Client.new
puts "** Starting Client for #{klass.name} queue"
# Start dispatching those messages
while (!Thread.current[:shutdown]) do
begin
# Keep MySQL connection alive
# unless ActiveRecord::Base.connection.active?
# unless ActiveRecord::Base.connection.reconnect!
# logger.fatal("FAILED - Database not available")
# break
# end
# end
# Dispatch and process the messages
n = dispatch!(connection, klass)
logger.debug("Listener thread #{klass.name} processed #{n.to_s} queue items") if n > 0
sleep(self.class.sleep_time) unless n > 0
# If there is a memcache error, hang for a bit to give it a chance to fire up again
# and reset the connection.
rescue MemCache::MemCacheError
logger.warn("Listener thread #{klass.name} failed to connect to memcache. Resetting connection.")
sleep(self.class.reset_time)
connection.reset
end
end
logger.debug("Listener thread #{klass.name} ended")
end
# Dispatcher for one worker class. Will throw MemCacheError if unable to connect.
# Returns the number of worker methods called
def dispatch!(connection, klass)
n = 0
for queue in klass.queues
begin
result = connection.get(queue.first)
if result
n += 1
handler = queue.last
method_name = klass.method_name(queue.first)
handler.send(method_name, result)
end
rescue MemCache::MemCacheError => e
logger.error("FAILED to connect with queue #{ queue }: #{ e } }")
raise e
rescue Object => e
logger.error("FAILED to process queue #{ queue }. #{ klass.queues[queue] } could not handle invocation of #{ Base.method_name(queue.first) } with #{ result.inspect }: #{ e }.\n#{ e.backtrace.join("\n") }")
end
end
return n
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment