Multi-threaded SQS Ruby Worker. I have tests. Maybe this is a gem? Lemme know if you find it useful.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'aws-sdk' | |
require 'json' | |
module SQSW | |
class SQueue | |
MissingQueue = Class.new(StandardError) | |
def initialize(url = nil) | |
raise(MissingQueue, "Missing queue_url") if url.nil? | |
@q = find_queue(url) | |
end | |
def find_queue(url) | |
sqs.queues[url].tap do |q| | |
raise(MissingQueue) unless q.exists? | |
q.wait_time_seconds = 5 | |
end | |
end | |
def enq(args) | |
@q.send_message(JSON.dump(args)) | |
end | |
def deq | |
@q.receive_message | |
end | |
def sqs | |
@sqs ||= AWS::SQS.new( | |
access_key_id: (ENV['SQS_ID'] || raise("Must set SQS_ID")), | |
secret_access_key: (ENV['SQS_SECRET'] || raise("Must set SQS_SECRET")) | |
) | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'thread' | |
module SQSW | |
class Worker | |
include Logging | |
def self.perform_async(*args) | |
if Rails.env.test? | |
@test_worker ||= new(s_queue: Queue.new) | |
@test_worker.perform(*args) | |
else | |
@worker ||= new | |
@worker.s_queue.enq(args) | |
end | |
end | |
attr_reader :s_queue | |
# Create a new Worker data structure. No threads are started. | |
# s_queue can be anything that responds to `enq` and `deq` | |
# s_queue#deq must return a thing that has a body, id- | |
# and responds to `delete`. | |
def initialize(num_workers:10, s_queue:nil) | |
@s_queue = s_queue || SQueue.new(@queue_url) | |
@msg_queue = Queue.new | |
@num_workers = num_workers | |
@running = false | |
@rmut = Mutex.new | |
end | |
# Creates producer/consumers threads. | |
def start | |
@rmut.synchronize {@running = true} | |
@producer = Thread.new do | |
Thread.current[:sqs_worker_id] = 'producer' | |
poll | |
end | |
@consumers = @num_workers.times.map do |i| | |
Thread.new do | |
Thread.current[:sqs_worker_id] = i | |
work | |
end | |
end | |
[@consumers, @producer].flatten.map(&:join) | |
end | |
# Instructs producer/consumers to not take on new messages. | |
# Calling this method will unblock the call to #start. | |
def stop | |
@rmut.synchronize {@running = false} | |
end | |
def install_traps | |
%W(INT TERM).each do |sig| | |
trap(sig) do | |
log(trp: "Received #{sig}. Shutting down.") | |
Thread.new{stop} | |
end | |
end | |
end | |
private | |
# Messages are read from s_queue and copied to msg_queue | |
def poll | |
log(start: 'polling sqs') | |
while running? | |
log(at: 'waiting for messages') | |
if msg = @s_queue.deq | |
log(at: 'receive_message', message_id: msg.id) | |
@msg_queue.enq(msg) | |
end | |
end | |
log(finish: 'polling sqs') | |
end | |
def running? | |
@rmut.synchronize{@running} | |
end | |
def work | |
log(start: 'working') | |
while running? | |
if m = take_msg | |
handle_msg(m) | |
else | |
sleep(0.5) | |
end | |
end | |
log(finish: 'working') | |
end | |
def take_msg | |
@msg_queue.deq(true) | |
rescue ThreadError | |
end | |
def handle_msg(m) | |
args = JSON.parse(m.body) | |
args = args.symbolize_keys if args.is_a?(Hash) | |
perform(*args) | |
m.delete | |
log(at: 'message_deleted', message_id: m.id) | |
rescue => e | |
log(error: e.to_s) | |
end | |
def log_data | |
{sqs_worker_id: Thread.current[:sqs_worker_id]} | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Check this out: http://vertx.io/