Created
December 7, 2015 15:41
-
-
Save jazzytomato/5155c92b70295d7665f8 to your computer and use it in GitHub Desktop.
helper for the resque workers to interact with redis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module WorkerHelper | |
REDIS_EXPIRY_TIME = 20.minutes | |
def logger | |
unless @logger | |
@logger = Logger.new(STDOUT) | |
@logger.formatter = proc do |severity, datetime, _progname, msg| | |
"[#{severity}] #{datetime}: #{msg}\n" | |
end | |
end | |
@logger | |
end | |
# @yield The work to do asynchronously | |
# @param [String, #read] channel the channel to subscribe to | |
# @param [String, #read] ok_msg the message to wait for | |
# @return [Thread] a thread that is waiting for the ok message from the given block | |
def thread_sub(channel, ok_msg = channel) | |
Thread.new do | |
redis.subscribe(channel) do |on| | |
on.subscribe { yield } | |
on.message do |_chan, msg| | |
fail(msg) unless msg == ok_msg | |
redis.unsubscribe | |
end | |
end | |
end | |
end | |
def wait_for(jobs) | |
jobs.each(&:join) | |
end | |
def publish(channel, msg = channel) | |
redis.publish channel, msg | |
end | |
def store(key, data) | |
redis.setex key, WorkerHelper::REDIS_EXPIRY_TIME, data | |
end | |
def redis | |
@redis ||= Redis::Store.new | |
end | |
def enqueue(job, *params) | |
run_queue_immediately? ? job.perform(*params) : Resque.enqueue(job, *params) | |
end | |
private | |
def run_queue_immediately? | |
Rails.env == 'test' || ENV['RUN_QUEUE_IMMEDIATELY'] | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment