Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
heartbeat actor w/ celluloid
require 'rbczmq'
require 'fileutils'
require 'celluloid'
class Heartbeat
include Celluloid
def self.start(id = nil, options = nil)
new(ZMQ::Context.new, id, options)
end
attr_reader :id, :nodes, :pub, :sub
def initialize(context, id = nil, options = nil)
options ||= {}
@root = options[:root] || "/tmp/zmq-node"
FileUtils.mkdir_p @root
@context = context
@id = (id || Process.pid).to_s
@pub = context.bind(:PUB, "ipc://#{@root}/#{@id}")
@nodes = {}
@sockets = {}
@threshold = (options[:threshold] || 60).to_i
end
def check(id)
@nodes[id.to_s]
end
def socket(id)
@sockets[id.to_s]
end
def publish
@pub.send "hb"
end
def add(id)
id_s = id.to_s
@nodes[id_s] = Time.now.to_i
@sockets[id_s] = sub = @context.connect(:SUB, "ipc://#{@root}/#{id_s}")
sub.subscribe 'hb'
end
def remove(id)
id_s = id.to_s
@nodes.delete id_s
sub = @sockets.delete(id_s)
sub.close
end
def receive
now = Time.now.to_i
@sockets.each do |id, socket|
while msg = socket.recv_nonblock
@nodes[id] = now
end
end
end
def stale
now = Time.now.to_i
ids = []
@nodes.each do |id, time|
if @threshold < (now - time)
ids << id
end
end
ids
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment