Skip to content

Instantly share code, notes, and snippets.

@dagda1
Created January 6, 2012 17:06

Revisions

  1. Paul revised this gist Jan 6, 2012. 1 changed file with 5 additions and 6 deletions.
    11 changes: 5 additions & 6 deletions em_defer.rb
    Original file line number Diff line number Diff line change
    @@ -41,13 +41,13 @@ def start_consumer
    channel.prefetch(1)

    requests_queue.subscribe(:ack => true) do |header, body|
    header.ack

    puts "we have a message at #{Time.now}"

    url_search = MultiJson.decode(body)

    pub = Proc.new do
    header.ack

    lead = get_lead(n, (n == 5))

    puts "about to publish #{n} message is_last = #{lead.is_last} at #{Time.now}"
    @@ -58,7 +58,6 @@ def start_consumer
    :correlation_id => header.correlation_id
    )

    puts "before next tick #{n}"
    EM.next_tick(pub) if n <= 5
    n += 1
    end
    @@ -68,7 +67,7 @@ def start_consumer
    callback = Proc.new do |x, y|
    puts "in callback"
    end
    operation = proc{ header.ack;puts "in operation" }
    operation = proc{ puts "in operation" }

    EM.defer(operation, callback)
    end
    @@ -101,8 +100,8 @@ def publish(urlSearch, routing_key)
    callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => false)

    callback_queue.subscribe do |header, body|
    header.ack


    debugger
    lead = safe_json_decode(body)

    puts "company = #{lead["company"]} is_last = #{lead["is_last"]} received at #{Time.now}"
  2. Paul created this gist Jan 6, 2012.
    150 changes: 150 additions & 0 deletions em_defer.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,150 @@
    require 'thor'
    require 'amqp'
    require 'json'
    # examples
    # __dir = File.join(File.dirname(File.expand_path(__FILE__)), "..")
    # require File.join(__dir, "example_helper")
    # /Users/paulcowan/.rvm/gems/jruby-1.6.5/gems/amq-client-0.8.4/examples
    module Worker
    module App
    module Cli

    class Development < Thor
    namespace "worker:dev"
    @conf = {
    :host => "localhost",
    :user => "guest",
    :password => "guest",
    :vhost => "/",
    :logging => true,
    :port => 5672
    }

    desc "start_consumer", "start the test consumer"
    def start_consumer

    n = 1

    AMQP.start(@conf) do |connection|

    channel = AMQP::Channel.new(connection)

    requests_queue = channel.queue("one", :exclusive => true, :auto_delete => true)
    requests_queue.purge

    Signal.trap("INT") do
    connection.close do
    EM.stop{exit}
    end
    end

    channel.prefetch(1)

    requests_queue.subscribe(:ack => true) do |header, body|
    puts "we have a message at #{Time.now}"

    url_search = MultiJson.decode(body)

    pub = Proc.new do
    header.ack

    lead = get_lead(n, (n == 5))

    puts "about to publish #{n} message is_last = #{lead.is_last} at #{Time.now}"

    AMQP::Exchange.default.publish(
    MultiJson.encode(lead),
    :routing_key => header.reply_to,
    :correlation_id => header.correlation_id
    )

    puts "before next tick #{n}"
    EM.next_tick(pub) if n <= 5
    n += 1
    end

    EM.next_tick(pub)

    callback = Proc.new do |x, y|
    puts "in callback"
    end
    operation = proc{ header.ack;puts "in operation" }

    EM.defer(operation, callback)
    end

    puts " [x] Awaiting RPC requests"
    end
    end

    desc "send_message", "send test message"
    def send_message
    urlSearch = {
    :url => "http://someurl.com/"
    }

    puts urlSearch.inspect

    publish(urlSearch, "one")
    end

    no_tasks do

    def publish(urlSearch, routing_key)
    EM.run do
    corr_id = rand(10_000_000).to_s

    requests ||= Hash.new

    connection = AMQP.connect(@conf)

    callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => false)

    callback_queue.subscribe do |header, body|
    header.ack

    lead = safe_json_decode(body)

    puts "company = #{lead["company"]} is_last = #{lead["is_last"]} received at #{Time.now}"

    if lead["is_last"]
    puts "in exit"
    connection.close do
    EM.stop{exit}
    end
    end
    end

    callback_queue.append_callback(:declare) do
    AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id)
    end

    puts "initial message sent"
    end
    end

    def get_lead(index, is_last)
    lead = Lead.new
    lead.company = "Company #{index}"
    lead.street_address_1 = "2 Lena Street"
    lead.street_address_2 = "Ravenscroft"
    lead.post_code = "BT7 1DD"
    lead.town = "Belfast"
    lead.country = "uk"
    lead.phone_1 = "0208 77139283"
    lead.fax = "0208 39303939"
    lead.url = "http://thesoftwaresimpleton.blogspot.com/s"
    lead.contacts << Contact.new(:title => "Managing Director", :name => "Bob Cratchet")
    lead.web = "http://www.mcburney.cowan.com/"
    lead.emails << Email.new(:email => "dagda1@scotalt.net")
    lead.emails << Email.new(:email => "paul.cowan@continuity2.com")
    lead.is_last = is_last
    lead
    end
    end #no_tasks

    end

    end
    end
    end