Skip to content

Instantly share code, notes, and snippets.

@neektza
Last active August 29, 2015 14:02
Show Gist options
  • Save neektza/a97b02d50b8f85869ce9 to your computer and use it in GitHub Desktop.
Save neektza/a97b02d50b8f85869ce9 to your computer and use it in GitHub Desktop.
Ruby concurrency blog post example
require 'bundler/setup'
require 'rubygems'
require 'celluloid'
require 'celluloid/io'
# defining a supervision group via inheritance
class Scraper < Celluloid::SupervisionGroup
supervise Publisher, as: :publisher
supervise Commander, as: :commander
supervise SourceSupervior, as: :source_supervisor,\
args: [["some_source_name", {:polling => ..., :streaming => ...}]]
end
class Commander
include Celluloid
def initialize
@rabbit = Bunny.new(RABBIT_URI).start
# ... initialization of an exchange (@x) and a queue (@q)
listen
end
def listen
@q.subscribe do |delivery_info, properties, payload|
msg = MultiJson.load(payload)
Celluloid::Actor[msg[:source_name]].restart_source
end
end
end
class Publisher
include Celluloid
def initialize
@db = Sequel.postgres('blog', :host, :user, :password)
@table = @db[:some_table]
end
def publish(data)
@table.insert(:tag => data[:tag], :content => data[:content])
end
end
class SourceSupervisor
include Celluloid
def initialize(source_config)
@source_name, @source_endpoints = source_config
# explicitly defining a supervision group
@endpoints = SupervisionGroup.new
start_source
end
def restart_source
stop_source
start_source
end
def start_source
@endpoints.supervise_as(actor_name(:streamer), Streamer, @source_endpoints[:streaming])
@endpoints.supervise_as(actor_name(:poller), Poller, @source_endpoints[:polling])
end
def stop_source
a.terminate if (a = Celluloid::Actor[actor_name(:streamer)])
a.terminate if (a = Celluloid::Actor[actor_name(:poller)])
end
def actor_name(actor_type)
"#{@source_name}_#{actor_type}".to_sym
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment