Created
January 6, 2012 17:06
Revisions
-
Paul revised this gist
Jan 6, 2012 . 1 changed file with 5 additions and 6 deletions.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -41,13 +41,13 @@ def start_consumer channel.prefetch(1) requests_queue.subscribe(:ack => true) do |header, body| header.ack puts "we have a message at #{Time.now}" url_search = MultiJson.decode(body) pub = Proc.new do lead = get_lead(n, (n == 5)) puts "about to publish #{n} message is_last = #{lead.is_last} at #{Time.now}" @@ -58,7 +58,6 @@ def start_consumer :correlation_id => header.correlation_id ) EM.next_tick(pub) if n <= 5 n += 1 end @@ -68,7 +67,7 @@ def start_consumer callback = Proc.new do |x, y| puts "in callback" end operation = proc{ puts "in operation" } EM.defer(operation, callback) end @@ -101,8 +100,8 @@ def publish(urlSearch, routing_key) callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => false) callback_queue.subscribe do |header, body| debugger lead = safe_json_decode(body) puts "company = #{lead["company"]} is_last = #{lead["is_last"]} received at #{Time.now}" -
Paul created this gist
Jan 6, 2012 .There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,150 @@ require 'thor' require 'amqp' require 'json' # examples # __dir = File.join(File.dirname(File.expand_path(__FILE__)), "..") # require File.join(__dir, "example_helper") # /Users/paulcowan/.rvm/gems/jruby-1.6.5/gems/amq-client-0.8.4/examples module Worker module App module Cli class Development < Thor namespace "worker:dev" @conf = { :host => "localhost", :user => "guest", :password => "guest", :vhost => "/", :logging => true, :port => 5672 } desc "start_consumer", "start the test consumer" def start_consumer n = 1 AMQP.start(@conf) do |connection| channel = AMQP::Channel.new(connection) requests_queue = channel.queue("one", :exclusive => true, :auto_delete => true) requests_queue.purge Signal.trap("INT") do connection.close do EM.stop{exit} end end channel.prefetch(1) requests_queue.subscribe(:ack => true) do |header, body| puts "we have a message at #{Time.now}" url_search = MultiJson.decode(body) pub = Proc.new do header.ack lead = get_lead(n, (n == 5)) puts "about to publish #{n} message is_last = #{lead.is_last} at #{Time.now}" AMQP::Exchange.default.publish( MultiJson.encode(lead), :routing_key => header.reply_to, :correlation_id => header.correlation_id ) puts "before next tick #{n}" EM.next_tick(pub) if n <= 5 n += 1 end EM.next_tick(pub) callback = Proc.new do |x, y| puts "in callback" end operation = proc{ header.ack;puts "in operation" } EM.defer(operation, callback) end puts " [x] Awaiting RPC requests" end end desc "send_message", "send test message" def send_message urlSearch = { :url => "http://someurl.com/" } puts urlSearch.inspect publish(urlSearch, "one") end no_tasks do def publish(urlSearch, routing_key) EM.run do corr_id = rand(10_000_000).to_s requests ||= Hash.new connection = AMQP.connect(@conf) callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => false) callback_queue.subscribe do |header, body| header.ack lead = safe_json_decode(body) puts "company = #{lead["company"]} is_last = #{lead["is_last"]} received at #{Time.now}" if lead["is_last"] puts "in exit" connection.close do EM.stop{exit} end end end callback_queue.append_callback(:declare) do AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id) end puts "initial message sent" end end def get_lead(index, is_last) lead = Lead.new lead.company = "Company #{index}" lead.street_address_1 = "2 Lena Street" lead.street_address_2 = "Ravenscroft" lead.post_code = "BT7 1DD" lead.town = "Belfast" lead.country = "uk" lead.phone_1 = "0208 77139283" lead.fax = "0208 39303939" lead.url = "http://thesoftwaresimpleton.blogspot.com/s" lead.contacts << Contact.new(:title => "Managing Director", :name => "Bob Cratchet") lead.web = "http://www.mcburney.cowan.com/" lead.emails << Email.new(:email => "dagda1@scotalt.net") lead.emails << Email.new(:email => "paul.cowan@continuity2.com") lead.is_last = is_last lead end end #no_tasks end end end end