Skip to content

Instantly share code, notes, and snippets.

@ryandotsmith
Last active August 29, 2015 14:07
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ryandotsmith/39f9315fc2d5c42f60b0 to your computer and use it in GitHub Desktop.
Save ryandotsmith/39f9315fc2d5c42f60b0 to your computer and use it in GitHub Desktop.
Multi-threaded SQS Ruby Worker. I have tests. Maybe this is a gem? Lemme know if you find it useful.
require 'aws-sdk'
require 'json'
module SQSW
class SQueue
MissingQueue = Class.new(StandardError)
def initialize(url = nil)
raise(MissingQueue, "Missing queue_url") if url.nil?
@q = find_queue(url)
end
def find_queue(url)
sqs.queues[url].tap do |q|
raise(MissingQueue) unless q.exists?
q.wait_time_seconds = 5
end
end
def enq(args)
@q.send_message(JSON.dump(args))
end
def deq
@q.receive_message
end
def sqs
@sqs ||= AWS::SQS.new(
access_key_id: (ENV['SQS_ID'] || raise("Must set SQS_ID")),
secret_access_key: (ENV['SQS_SECRET'] || raise("Must set SQS_SECRET"))
)
end
end
end
require 'thread'
module SQSW
class Worker
include Logging
def self.perform_async(*args)
if Rails.env.test?
@test_worker ||= new(s_queue: Queue.new)
@test_worker.perform(*args)
else
@worker ||= new
@worker.s_queue.enq(args)
end
end
attr_reader :s_queue
# Create a new Worker data structure. No threads are started.
# s_queue can be anything that responds to `enq` and `deq`
# s_queue#deq must return a thing that has a body, id-
# and responds to `delete`.
def initialize(num_workers:10, s_queue:nil)
@s_queue = s_queue || SQueue.new(@queue_url)
@msg_queue = Queue.new
@num_workers = num_workers
@running = false
@rmut = Mutex.new
end
# Creates producer/consumers threads.
def start
@rmut.synchronize {@running = true}
@producer = Thread.new do
Thread.current[:sqs_worker_id] = 'producer'
poll
end
@consumers = @num_workers.times.map do |i|
Thread.new do
Thread.current[:sqs_worker_id] = i
work
end
end
[@consumers, @producer].flatten.map(&:join)
end
# Instructs producer/consumers to not take on new messages.
# Calling this method will unblock the call to #start.
def stop
@rmut.synchronize {@running = false}
end
def install_traps
%W(INT TERM).each do |sig|
trap(sig) do
log(trp: "Received #{sig}. Shutting down.")
Thread.new{stop}
end
end
end
private
# Messages are read from s_queue and copied to msg_queue
def poll
log(start: 'polling sqs')
while running?
log(at: 'waiting for messages')
if msg = @s_queue.deq
log(at: 'receive_message', message_id: msg.id)
@msg_queue.enq(msg)
end
end
log(finish: 'polling sqs')
end
def running?
@rmut.synchronize{@running}
end
def work
log(start: 'working')
while running?
if m = take_msg
handle_msg(m)
else
sleep(0.5)
end
end
log(finish: 'working')
end
def take_msg
@msg_queue.deq(true)
rescue ThreadError
end
def handle_msg(m)
args = JSON.parse(m.body)
args = args.symbolize_keys if args.is_a?(Hash)
perform(*args)
m.delete
log(at: 'message_deleted', message_id: m.id)
rescue => e
log(error: e.to_s)
end
def log_data
{sqs_worker_id: Thread.current[:sqs_worker_id]}
end
end
end
@litch
Copy link

litch commented Oct 12, 2014

Check this out: http://vertx.io/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment