Skip to content

Instantly share code, notes, and snippets.

@mcallaway
Created November 2, 2012 21:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mcallaway/4004514 to your computer and use it in GitHub Desktop.
Save mcallaway/4004514 to your computer and use it in GitHub Desktop.
1 #!/usr/bin/env ruby
2
3 require 'rubygems'
4 require 'ruote'
5 require 'ruote/storage/fs_storage'
6 require 'amqp'
7 require 'ruote-amqp'
8
9 AMQP.settings[:user] = 'guest'
10 AMQP.settings[:pass] = 'guest'
11 AMQP.settings[:host] = 'localhost'
12 AMQP.settings[:vhost] = '/'
13
14 def ensure_em_is_running
15 puts "Ensure EM"
16 unless @em
17 @em = Thread.new { EM.run {} }
18 sleep 0.5
19 end
20 end
21
22 def count_amqp_objects
23
24 objects = {}
25
26 ObjectSpace.each_object do |o|
27
28 next unless [
29 AMQP::Queue, AMQP::Channel, AMQP::Exchange
30 ].include?(o.class)
31
32 k = o.class.to_s
33
34 objects[k] ||= 0
35 objects[k] = objects[k] + 1
36 end
37
38 objects
39 end
40
41 def display_amqp_object_count 42
43 objects = count_amqp_objects 44
45 objects.keys.sort.each do |k|
46 puts "#{k}: #{objects[k]}"
47 end
48 end
49
50 class MyReceiver < Ruote::Amqp::Receiver
51 def handle(headers, payload)
52 puts "msg received"
53 $stdout.flush
54 end
55 end
56
57 class Worker < Ruote::Amqp::Participant
58 def on_workitem
59 if workitem.fields['message'] != nil
60 puts "got a message #{workitem.fields['message']}"
61 end
62 puts "start work..."
63 sleep(5)
64 workitem.fields['message'] = { 'text' => 'hello !', 'author' => 'Alice' }
65 puts "done"
66 reply
67 end
68 end
69
70
71 # Connection control example
72 #Ruote::Amqp.session = AMQP.connect(:auto_recovery => true) do |con|
73 # con.on_recovery do |con|
74 # puts "Recovered..."
75 # end
76 # connection.on_tcp_connection_loss do |con, settings|
77 # puts "Reconnecting... please wait"
78 # conn.reconnect(false, 20)
79 # end
80 #end
81 #Ruote::Amqp.session = AMQP.connect(:auto_recovery => true) do |con|
82 # puts "session check"
83 # con.on_recovery do |con|
84 # puts "Recovered..."
85 # end
86 # connection.on_tcp_connection_loss do |con, settings|
87 # puts "Reconnecting... please wait"
88 # conn.reconnect(false, 20)
89 # end
90 #end
91
92 EventMachine.run do
93 # preparing the dashboard
94 dashboard = Ruote::Dashboard.new(
95 Ruote::Worker.new(
96 Ruote::FsStorage.new('ruote_work')
97 )
98 )
99 dashboard.noisy = true
100
101 #class Worker < Ruote::Participant
102
103
104 connection = AMQP.connect(:host => '127.0.0.1')
105 puts "Connecting to AMQP broker. Running #{AMQP::VERSION} version of the gem..."
106 channel = AMQP::Channel.new(connection)
107 queue = channel.queue("/", :auto_delete => true)
108 exchange = channel.default_exchange
109
110 # map participants to classes
111 # I think forget is important, see amqp/participant.rb
112
113 dashboard.register :sample, Ruote::Amqp::Participant, :exchange => ['direct', ''], :routing_key => 'alpha', :forget => true
114
115 #receiver = Ruote::Amqp::Receiver.new(dashboard, AMQP::Channel.new.queue('ruote_workitems'), :launch_only => true)
116 receiver = Ruote::Amqp::Receiver.new(dashboard, AMQP::Channel.new.queue('alpha'), :launch_only => true)
117
118 #dashboard.register :alpha, Ruote::Amqp::Participant, :exchange => ['direct', ''], :routing_key => 'alpha', :forget => true
119 #dashboard.register :alpha, Worker, :exchange => ['direct', ''], :routing_key => 'alpha', :forget => true
120 #dashboard.register :bravo, Worker, :exchange => ['direct', ''], :routing_key => 'alpha', :forget => true
121
122 # defining a process
123 pdef = Ruote.process_definition :name => 'test' do
124 sample :command => 'toss/sample/sub', :timeout => '30s', :on_timeout => 'redo'
125 concurrent_iterator :times => 10 do
126 sample :command => 'toss/sample/sub', :timeout => '30s', :on_timeout => 'redo'
127 end
128 sample :command => 'toss/sample/sub', :timeout => '30s', :on_timeout => 'redo'
129 end
130
131
132 # launching, creating a process instance
133 wfid = dashboard.launch(pdef)
134 dashboard.wait_for(wfid)
135
136 #hangs?
137 #AMQP.stop { EventMachine.stop }
138 # blocks current thread until our process instance terminates
139 end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment