Skip to content

Instantly share code, notes, and snippets.

class OBS < RuoteAMQP::Participant
def consume( stuff )
super stuff, :command => '/obs/build', :queue => 'obs', :reply_queue => "ruote_workitems"
end
end
class IMG < ParticipantWrapper
def route
{ :command => '/img/image',
:queue => 'img',
}
end
end
def determine_forget( workitem )
return workitem['fields']['params']['forget'] if workitem['fields']['params'].has_key?( 'forget' )
return workitem['fields']['params']['participant_options']['forget'] if workitem['fields']['params']['participant_options'].has_key?( 'forget' )
false
end
def determine_value( workitem, value )
return @workitem['fields']['params'][value] if @workitem['fields']['params'].has_key?( value )
return @workitem['fields']['params']['participant_options']['command'] if @workitem['fields']['params'].has_key?( 'participant_options' ) and @workitem['fields']['params']['participant_options'].has_key?( value )
nil
end
[10:04] <lbt> also... "if message = ( workitem.fields['message'] || workitem.fields['params']['message'] )"
[10:04] <lbt> I don't think we should look inside fields?
[10:05] <kennethkalmer> not too sure honestly, need to review all the patches and get a working flow on my side to see
[10:05] <lbt> that's a perfectly valid thing for a participant to use without expecting odd side effects
[10:05] <lbt> ... I'm wondering if fields should have a namespace thing going on somehow?
[10:06] <lbt> so you can safely write to anything in your namespace and 'publish' well known information there...
[10:06] <kennethkalmer> fields are just hashes that you populate with what your application requires
[10:06] <kennethkalmer> the beauty is the simplicity
[10:07] <lbt> yes.... and they'd stay that way... but by convention write to field->step_name->name = value
[10:07] <lbt> in the same way we have fields->params and fields->participant_options
root@amqpvm:~# rabbitmqctl list_exchanges
Listing exchanges ...
amq.rabbitmq.log topic
amq.match headers
amq.headers headers
amq.topic topic
amq.direct direct
amq.fanout fanout
direct
...done.
import simplejson as json
from copy import deepcopy
class FlowExpressionId(object):
"""
The FlowExpressionId (fei for short) is an process expression identifier.
Each expression when instantiated gets a unique fei.
Feis are also used in workitems, where the fei is the fei of the
[participant] expression that emitted the workitem.
david@ash:/maemo/devel/obs/boss/ruote/ruote$ grep -ir plist .
grep: ./examples/.#README: No such file or directory
./lib/ruote/context.rb: 's_plist' => [
./lib/ruote/exp/fe_cursor.rb: # In this simplistic example, the process will flow from author to reviewer
./lib/ruote/exp/fe_participant.rb: participant_info = @context.plist.lookup_info(h.participant_name)
./lib/ruote/exp/fe_participant.rb: pa = @context.plist.lookup(
./lib/ruote/exp/flowexpression.rb: ep = context.plist.lookup(fei['engine_id'])
./lib/ruote/part/dispatch_pool.rb: participant = @context.plist.lookup(msg['participant_name'])
./lib/ruote/part/dispatch_pool.rb: participant = @context.plist.lookup(msg['participant_name'])
./lib/ruote/engine.rb: pa = @context.plist.register(regex, participant, opts, block)
david@ash:/maemo/devel/obs/boss/ruote/ruote/lib$ irb
irb(main):001:0> require "ruote/engine"
=> true
irb(main):002:0> method(:plist)
NameError: undefined method `plist' for class `Object'
from (irb):2:in `method'
from (irb):2
from :0
irb(main):003:0>
class EngineParticipant
include Ruote::LocalParticipant
def initialize (opts)
@opts = opts
end
def consume (wi)
if wi.fields.has_key? 'command'
case wi.lookup "command"
when "register"