Created
November 2, 2012 21:43
-
-
Save mcallaway/4004514 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1 #!/usr/bin/env ruby | |
2 | |
3 require 'rubygems' | |
4 require 'ruote' | |
5 require 'ruote/storage/fs_storage' | |
6 require 'amqp' | |
7 require 'ruote-amqp' | |
8 | |
9 AMQP.settings[:user] = 'guest' | |
10 AMQP.settings[:pass] = 'guest' | |
11 AMQP.settings[:host] = 'localhost' | |
12 AMQP.settings[:vhost] = '/' | |
13 | |
14 def ensure_em_is_running | |
15 puts "Ensure EM" | |
16 unless @em | |
17 @em = Thread.new { EM.run {} } | |
18 sleep 0.5 | |
19 end | |
20 end | |
21 | |
22 def count_amqp_objects | |
23 | |
24 objects = {} | |
25 | |
26 ObjectSpace.each_object do |o| | |
27 | |
28 next unless [ | |
29 AMQP::Queue, AMQP::Channel, AMQP::Exchange | |
30 ].include?(o.class) | |
31 | |
32 k = o.class.to_s | |
33 | |
34 objects[k] ||= 0 | |
35 objects[k] = objects[k] + 1 | |
36 end | |
37 | |
38 objects | |
39 end | |
40 | |
41 def display_amqp_object_count 42 | |
43 objects = count_amqp_objects 44 | |
45 objects.keys.sort.each do |k| | |
46 puts "#{k}: #{objects[k]}" | |
47 end | |
48 end | |
49 | |
50 class MyReceiver < Ruote::Amqp::Receiver | |
51 def handle(headers, payload) | |
52 puts "msg received" | |
53 $stdout.flush | |
54 end | |
55 end | |
56 | |
57 class Worker < Ruote::Amqp::Participant | |
58 def on_workitem | |
59 if workitem.fields['message'] != nil | |
60 puts "got a message #{workitem.fields['message']}" | |
61 end | |
62 puts "start work..." | |
63 sleep(5) | |
64 workitem.fields['message'] = { 'text' => 'hello !', 'author' => 'Alice' } | |
65 puts "done" | |
66 reply | |
67 end | |
68 end | |
69 | |
70 | |
71 # Connection control example | |
72 #Ruote::Amqp.session = AMQP.connect(:auto_recovery => true) do |con| | |
73 # con.on_recovery do |con| | |
74 # puts "Recovered..." | |
75 # end | |
76 # connection.on_tcp_connection_loss do |con, settings| | |
77 # puts "Reconnecting... please wait" | |
78 # conn.reconnect(false, 20) | |
79 # end | |
80 #end | |
81 #Ruote::Amqp.session = AMQP.connect(:auto_recovery => true) do |con| | |
82 # puts "session check" | |
83 # con.on_recovery do |con| | |
84 # puts "Recovered..." | |
85 # end | |
86 # connection.on_tcp_connection_loss do |con, settings| | |
87 # puts "Reconnecting... please wait" | |
88 # conn.reconnect(false, 20) | |
89 # end | |
90 #end | |
91 | |
92 EventMachine.run do | |
93 # preparing the dashboard | |
94 dashboard = Ruote::Dashboard.new( | |
95 Ruote::Worker.new( | |
96 Ruote::FsStorage.new('ruote_work') | |
97 ) | |
98 ) | |
99 dashboard.noisy = true | |
100 | |
101 #class Worker < Ruote::Participant | |
102 | |
103 | |
104 connection = AMQP.connect(:host => '127.0.0.1') | |
105 puts "Connecting to AMQP broker. Running #{AMQP::VERSION} version of the gem..." | |
106 channel = AMQP::Channel.new(connection) | |
107 queue = channel.queue("/", :auto_delete => true) | |
108 exchange = channel.default_exchange | |
109 | |
110 # map participants to classes | |
111 # I think forget is important, see amqp/participant.rb | |
112 | |
113 dashboard.register :sample, Ruote::Amqp::Participant, :exchange => ['direct', ''], :routing_key => 'alpha', :forget => true | |
114 | |
115 #receiver = Ruote::Amqp::Receiver.new(dashboard, AMQP::Channel.new.queue('ruote_workitems'), :launch_only => true) | |
116 receiver = Ruote::Amqp::Receiver.new(dashboard, AMQP::Channel.new.queue('alpha'), :launch_only => true) | |
117 | |
118 #dashboard.register :alpha, Ruote::Amqp::Participant, :exchange => ['direct', ''], :routing_key => 'alpha', :forget => true | |
119 #dashboard.register :alpha, Worker, :exchange => ['direct', ''], :routing_key => 'alpha', :forget => true | |
120 #dashboard.register :bravo, Worker, :exchange => ['direct', ''], :routing_key => 'alpha', :forget => true | |
121 | |
122 # defining a process | |
123 pdef = Ruote.process_definition :name => 'test' do | |
124 sample :command => 'toss/sample/sub', :timeout => '30s', :on_timeout => 'redo' | |
125 concurrent_iterator :times => 10 do | |
126 sample :command => 'toss/sample/sub', :timeout => '30s', :on_timeout => 'redo' | |
127 end | |
128 sample :command => 'toss/sample/sub', :timeout => '30s', :on_timeout => 'redo' | |
129 end | |
130 | |
131 | |
132 # launching, creating a process instance | |
133 wfid = dashboard.launch(pdef) | |
134 dashboard.wait_for(wfid) | |
135 | |
136 #hangs? | |
137 #AMQP.stop { EventMachine.stop } | |
138 # blocks current thread until our process instance terminates | |
139 end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment