Skip to content

Instantly share code, notes, and snippets.

@schmurfy
Last active August 29, 2015 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save schmurfy/d3ba8cdbc4c1f943cdb2 to your computer and use it in GitHub Desktop.
Save schmurfy/d3ba8cdbc4c1f943cdb2 to your computer and use it in GitHub Desktop.
concurrent-ruby experimentations
require 'rubygems'
require 'bundler/setup'
require 'concurrent'
logger = Logger.new($stderr) #
# Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block|
# logger.add level, message, progname, &block
# end #
class Master < Concurrent::Actor::RestartingContext
def initialize
p [:master, :created]
@listeners = []
end
def on_message(msg_parts)
cmd, *args = *msg_parts
case cmd
when :subscribe
@listeners << envelope.sender
when :crash
raise "wtf"
when :data_received
@listeners.each{|actor| actor << msg_parts }
when :listeners
@listeners
else
pass
end
end
end
class Listener < Concurrent::Actor::RestartingContext
def initialize(master)
p [:listener, Concurrent::Actor.name, :started]
master << :link
master << :subscribe
end
def on_message(msg_parts)
p [:ss, msg_parts]
cmd, *args = *msg_parts
case cmd
when :data_received
p [:listener, Concurrent::Actor.name, :data, args[0]]
else
pass
end
end
end #
master = Master.spawn(name: 'master', supervise: true)
listener = Listener.spawn(name: 'listener1', supervise: true, args: [master])
master << [:data_received, 42]
master << :crash
sleep 0.1
master << [:data_received, 32]
master << :crash
master << [:data_received, 32]
sleep
require 'concurrent'
class DataWriter < Concurrent::Actor::Context
def initialize(dispatcher)
dispatcher << :subscribe
end
def on_message(msg)
case msg[0]
when :msg
p [:yeah, msg[1]]
end
end
end
dispatcher = Concurrent::Actor::Utils::Broadcast.spawn! 'event-dispatcher'
writer1 = DataWriter.spawn(:one, dispatcher)
writer2 = DataWriter.spawn(:two, dispatcher)
dispatcher << [:msg, 'asd']
sleep
require 'rubygems'
require 'bundler/setup'
require 'concurrent'
class Crasher < Concurrent::Actor::RestartingContext
def initialize
p [:create]
end
def on_message(msg)
case msg
when :crash
raise "arrggggg"
when :print
p [:hello]
end
end
end
actor = Crasher.spawn!(:bob)
actor << :print
actor << :crash
sleep 1
actor << :print
sleep
@pitr-ch
Copy link

pitr-ch commented Oct 21, 2014

does following work for you?

require 'concurrent'
class DataWriter < Concurrent::Actor::Context
  def initialize(dispatcher)
    dispatcher << :subscribe
  end

  def on_message(msg)
    case msg[0]
    when :msg
      p [:yeah, msg[1]]
    end
  end
end

dispatcher = Concurrent::Actor::Utils::Broadcast.spawn! 'event-dispatcher'
writer1    = DataWriter.spawn(:one, dispatcher)
writer2    = DataWriter.spawn(:two, dispatcher)
dispatcher << [:msg, 'asd']

sleep

@pitr-ch
Copy link

pitr-ch commented Oct 28, 2014

To fix it you need to see:

then:

class Crasher < Concurrent::Actor::RestartingContext
  def initialize
    p [:create]
  end


  def on_message(msg)
    case msg
    when :crash
      raise "arrggggg"

    when :print
      p [:hello]
    end
  end
end

actor = Crasher.spawn!(name: 'bob', supervise: true)
actor << :print
actor << :crash
sleep 0.1
actor << :print

should fix it

@pitr-ch
Copy link

pitr-ch commented Nov 12, 2014

I've polished just a little bit:

class Master < Concurrent::Actor::RestartingContext
  def initialize
    p [path, :created]
    @listeners = []
  end

  def on_message(msg_parts)
    cmd, *args = *msg_parts
    case cmd
    when :subscribe
      @listeners << envelope.sender

    when :crash
      raise "wtf"

    when :broadcast
      @listeners.each { |actor| actor << msg_parts }

    when :listeners
      @listeners

    else
      pass
    end
  end

end

class Listener < Concurrent::Actor::RestartingContext
  def initialize(master)
    p [path, :created]
    master << :link << :subscribe
  end

  def on_message(msg_parts)
    p [:listener_got, msg_parts]
    cmd, *args = *msg_parts
    case cmd
    when :reset
      # reset! when master does
      behaviour!(Concurrent::Actor::Behaviour::Pausing).reset!
    when StandardError
      # pause! when master does
      behaviour!(Concurrent::Actor::Behaviour::Pausing).pause!
    when :broadcast
      p [path, :data, args[0]]
    else
      pass
    end
  end

end #

master   = Master.spawn(name: 'master', supervise: true)
# gets auto subscribed again on reset
listener = Listener.spawn(name: 'listener1', supervise: true, args: [master])

master << [:broadcast, 42] << :crash
sleep 0.1
master << [:broadcast, 32] << :crash << [:broadcast, 32]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment