Skip to content

Instantly share code, notes, and snippets.

@dagda1
Created January 6, 2012 17:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dagda1/1571453 to your computer and use it in GitHub Desktop.
Save dagda1/1571453 to your computer and use it in GitHub Desktop.
em_defer.rb
require 'thor'
require 'amqp'
require 'json'
# examples
# __dir = File.join(File.dirname(File.expand_path(__FILE__)), "..")
# require File.join(__dir, "example_helper")
# /Users/paulcowan/.rvm/gems/jruby-1.6.5/gems/amq-client-0.8.4/examples
module Worker
module App
module Cli
class Development < Thor
namespace "worker:dev"
@conf = {
:host => "localhost",
:user => "guest",
:password => "guest",
:vhost => "/",
:logging => true,
:port => 5672
}
desc "start_consumer", "start the test consumer"
def start_consumer
n = 1
AMQP.start(@conf) do |connection|
channel = AMQP::Channel.new(connection)
requests_queue = channel.queue("one", :exclusive => true, :auto_delete => true)
requests_queue.purge
Signal.trap("INT") do
connection.close do
EM.stop{exit}
end
end
channel.prefetch(1)
requests_queue.subscribe(:ack => true) do |header, body|
header.ack
puts "we have a message at #{Time.now}"
url_search = MultiJson.decode(body)
pub = Proc.new do
lead = get_lead(n, (n == 5))
puts "about to publish #{n} message is_last = #{lead.is_last} at #{Time.now}"
AMQP::Exchange.default.publish(
MultiJson.encode(lead),
:routing_key => header.reply_to,
:correlation_id => header.correlation_id
)
EM.next_tick(pub) if n <= 5
n += 1
end
EM.next_tick(pub)
callback = Proc.new do |x, y|
puts "in callback"
end
operation = proc{ puts "in operation" }
EM.defer(operation, callback)
end
puts " [x] Awaiting RPC requests"
end
end
desc "send_message", "send test message"
def send_message
urlSearch = {
:url => "http://someurl.com/"
}
puts urlSearch.inspect
publish(urlSearch, "one")
end
no_tasks do
def publish(urlSearch, routing_key)
EM.run do
corr_id = rand(10_000_000).to_s
requests ||= Hash.new
connection = AMQP.connect(@conf)
callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => false)
callback_queue.subscribe do |header, body|
debugger
lead = safe_json_decode(body)
puts "company = #{lead["company"]} is_last = #{lead["is_last"]} received at #{Time.now}"
if lead["is_last"]
puts "in exit"
connection.close do
EM.stop{exit}
end
end
end
callback_queue.append_callback(:declare) do
AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id)
end
puts "initial message sent"
end
end
def get_lead(index, is_last)
lead = Lead.new
lead.company = "Company #{index}"
lead.street_address_1 = "2 Lena Street"
lead.street_address_2 = "Ravenscroft"
lead.post_code = "BT7 1DD"
lead.town = "Belfast"
lead.country = "uk"
lead.phone_1 = "0208 77139283"
lead.fax = "0208 39303939"
lead.url = "http://thesoftwaresimpleton.blogspot.com/s"
lead.contacts << Contact.new(:title => "Managing Director", :name => "Bob Cratchet")
lead.web = "http://www.mcburney.cowan.com/"
lead.emails << Email.new(:email => "dagda1@scotalt.net")
lead.emails << Email.new(:email => "paul.cowan@continuity2.com")
lead.is_last = is_last
lead
end
end #no_tasks
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment