Skip to content

Instantly share code, notes, and snippets.

@bleonard
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bleonard/9372841 to your computer and use it in GitHub Desktop.
Save bleonard/9372841 to your computer and use it in GitHub Desktop.
Resque Bus
bus = ResqueBusSpec.watch
# do stuff
events = bus.all_queued("my_event")
events.size.should == 2
event = events.first.attributes
event["action"].should == "whatever"
event = events.second.attributes
event["action"].should == "other"
bus = ResqueBusSpec.watch
post "/api/thing"
expect(response.status).to eq(200)
event = bus.first_queued("the_event_type")
event = event.attributes
expect(event["user_id"]).to eq(user.id)
ResqueBus.publish("this_event", "user_id" => 3)
expect(MyClass).to receive(:save!)
ResqueBusSpec.perform_all! # run it.
FactoryGirl.create(:my_object) # publishes on create
ResqueBusSpec.perform_all! # work it
# check it's in search index or whatever
require "resque-bus"
module ResqueBusSpec
class QueueItem
attr_reader :queue, :klass, :event_type, :attributes
def initialize(queue, klass, attributes)
@queue = queue
@klass = klass
@klass = Object.const_get(@klass) if @klass.is_a?(String)
@event_type = attributes["bus_event_type"].to_s
@attributes = attributes.with_indifferent_access
end
def perform!
@klass.perform(attributes)
end
def match?(event_type, attributes = {})
return false unless event_type == self.event_type
return true if attributes.empty?
sub = attributes.slice(*attributes.keys)
sub == attributes
end
end
class QueueWatcher
def initialize
subscribe
end
def subscribe
ResqueBusSpec.listeners << self
end
def unsubscribe
ResqueBusSpec.listeners.delete(self)
end
def queued_list
@queued ||= []
end
def worked_list
@worked ||= []
end
def queued(item)
self.queued_list << item
end
def worked(item)
self.worked_list << item
end
def all_queued(event_type, attributes = {})
queued_list.select{ |item| item.match?(event_type, attributes)}
end
def all_worked(event_type, attributes = {})
worked_list.select{ |item| item.match?(event_type, attributes)}
end
def worked?(event_type, attributes = {})
all_worked(event_type, attributes).size > 0
end
def published?(event_type, attributes = {})
all_queued(event_type, attributes).size > 0
end
def first_queued(event_type)
all_queued(event_type).first
end
end
extend self
def watch
QueueWatcher.new
end
def queues
# empty array
@queues ||= Hash.new{ |hash, key| hash[key] = [] }
end
def listeners
@listeners ||= []
end
def enqueue_to(queue, klass, *args)
attributes = args.last
item = QueueItem.new(queue, klass, attributes)
if attributes["bus_rider_sub_key"] == "resque_bus_spec_helper"
listeners.each do |listener|
listener.queued(item)
end
else
self.queues[queue] << item
end
end
def work_queue_order
# pick one at random (no set order at the moment)
queues.keys.shuffle
end
def work_one(*args)
keys = work_queue_order
keys = args.flatten if args.size > 0
work_queue_order.each do |queue|
if queues.has_key?(queue) && (item = queues[queue].shift)
item.perform!
listeners.each do |listener|
listener.worked(item)
end
return true
end
end
return false
end
def perform_all(*args)
count = 0
while self.work_one(*args)
count += 1
end
count
end
def perform_all!
perform_all(*queues.keys)
end
def reset!
@queues = nil
@listeners = nil
end
end
module ResqueBus
class << self
def enqueue_to(queue, klass, *args)
ResqueBusSpec.enqueue_to(queue, klass, *args)
end
end
end
ResqueBus.local_mode = :standalone
# need to subscribe to everything to get it to queue
ResqueBus.dispatch("spec") do
subscribe "resque_bus_spec_helper", "bus_event_type" => :key do |attributes|
end
end
RSpec.configure do |config|
# stuff
require_relative 'support/resque_bus_spec_helper'
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment