Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
1 #!/usr/bin/env ruby
2
3 require 'rubygems'
4 require 'ruote'
5 require 'ruote/storage/fs_storage'
6 require 'amqp'
7 require 'ruote-amqp'
8
9 AMQP.settings[:user] = 'guest'
10 AMQP.settings[:pass] = 'guest'
11 AMQP.settings[:host] = 'localhost'
12 AMQP.settings[:vhost] = '/'
13
14 def ensure_em_is_running
15 puts "Ensure EM"
16 unless @em
17 @em = Thread.new { EM.run {} }
18 sleep 0.5
19 end
20 end
21
22 def count_amqp_objects
23
24 objects = {}
25
26 ObjectSpace.each_object do |o|
27
28 next unless [
29 AMQP::Queue, AMQP::Channel, AMQP::Exchange
30 ].include?(o.class)
31
32 k = o.class.to_s
33
34 objects[k] ||= 0
35 objects[k] = objects[k] + 1
36 end
37
38 objects
39 end
40
41 def display_amqp_object_count 42
43 objects = count_amqp_objects 44
45 objects.keys.sort.each do |k|
46 puts "#{k}: #{objects[k]}"
47 end
48 end
49
50 class MyReceiver < Ruote::Amqp::Receiver
51 def handle(headers, payload)
52 puts "msg received"
53 $stdout.flush
54 end
55 end
56
57 class Worker < Ruote::Amqp::Participant
58 def on_workitem
59 if workitem.fields['message'] != nil
60 puts "got a message #{workitem.fields['message']}"
61 end
62 puts "start work..."
63 sleep(5)
64 workitem.fields['message'] = { 'text' => 'hello !', 'author' => 'Alice' }
65 puts "done"
66 reply
67 end
68 end
69
70
71 # Connection control example
72 #Ruote::Amqp.session = AMQP.connect(:auto_recovery => true) do |con|
73 # con.on_recovery do |con|
74 # puts "Recovered..."
75 # end
76 # connection.on_tcp_connection_loss do |con, settings|
77 # puts "Reconnecting... please wait"
78 # conn.reconnect(false, 20)
79 # end
80 #end
81 #Ruote::Amqp.session = AMQP.connect(:auto_recovery => true) do |con|
82 # puts "session check"
83 # con.on_recovery do |con|
84 # puts "Recovered..."
85 # end
86 # connection.on_tcp_connection_loss do |con, settings|
87 # puts "Reconnecting... please wait"
88 # conn.reconnect(false, 20)
89 # end
90 #end
91
92 EventMachine.run do
93 # preparing the dashboard
94 dashboard = Ruote::Dashboard.new(
95 Ruote::Worker.new(
96 Ruote::FsStorage.new('ruote_work')
97 )
98 )
99 dashboard.noisy = true
100
101 #class Worker < Ruote::Participant
102
103
104 connection = AMQP.connect(:host => '127.0.0.1')
105 puts "Connecting to AMQP broker. Running #{AMQP::VERSION} version of the gem..."
106 channel = AMQP::Channel.new(connection)
107 queue = channel.queue("/", :auto_delete => true)
108 exchange = channel.default_exchange
109
110 # map participants to classes
111 # I think forget is important, see amqp/participant.rb
112
113 dashboard.register :sample, Ruote::Amqp::Participant, :exchange => ['direct', ''], :routing_key => 'alpha', :forget => true
114
115 #receiver = Ruote::Amqp::Receiver.new(dashboard, AMQP::Channel.new.queue('ruote_workitems'), :launch_only => true)
116 receiver = Ruote::Amqp::Receiver.new(dashboard, AMQP::Channel.new.queue('alpha'), :launch_only => true)
117
118 #dashboard.register :alpha, Ruote::Amqp::Participant, :exchange => ['direct', ''], :routing_key => 'alpha', :forget => true
119 #dashboard.register :alpha, Worker, :exchange => ['direct', ''], :routing_key => 'alpha', :forget => true
120 #dashboard.register :bravo, Worker, :exchange => ['direct', ''], :routing_key => 'alpha', :forget => true
121
122 # defining a process
123 pdef = Ruote.process_definition :name => 'test' do
124 sample :command => 'toss/sample/sub', :timeout => '30s', :on_timeout => 'redo'
125 concurrent_iterator :times => 10 do
126 sample :command => 'toss/sample/sub', :timeout => '30s', :on_timeout => 'redo'
127 end
128 sample :command => 'toss/sample/sub', :timeout => '30s', :on_timeout => 'redo'
129 end
130
131
132 # launching, creating a process instance
133 wfid = dashboard.launch(pdef)
134 dashboard.wait_for(wfid)
135
136 #hangs?
137 #AMQP.stop { EventMachine.stop }
138 # blocks current thread until our process instance terminates
139 end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.