Skip to content

Instantly share code, notes, and snippets.

@Asmod4n
Created August 18, 2014 12:40
Show Gist options
  • Save Asmod4n/81109b231fed68a9add7 to your computer and use it in GitHub Desktop.
Save Asmod4n/81109b231fed68a9add7 to your computer and use it in GitHub Desktop.
First stab at a new high level czmq wrapper in ruby
require 'bundler/setup'
require 'msgpack'
require 'concurrent'
Concurrent::Actor.i_know_it_is_experimental!
logger = Logger.new($stderr)
Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block|
logger.add level, message, progname, &block
end
require_relative 'ffi-czmq'
CZMQ.init
ClientResponse = Struct.new(:uuid, :answer)
ClientRequest = Struct.new(:uuid, :question) do
def send_over(socket, type)
request = CZMQ::Zmsg.new
request << ''
request << type
request << uuid
request << MessagePack.dump(question)
request.send_zmsg(socket)
end
def get_answer_from(socket)
MessagePack.load(CZMQ::Zstr.new(socket).recv)
end
end
ServerRequest = Struct.new(:identities, :uuid, :question)
ServerResponse = Struct.new(:identities, :uuid, :answer) do
def send_over(socket, type)
response = CZMQ::Zmsg.new
identities.each do |identity|
response << identity
end
response << ''
response << type
response << uuid
response << MessagePack.dump(answer)
response.send_zmsg(socket)
end
end
class ServerWorker < CZMQ::Zthread
attr_reader :parent_pipe, :sender
def initialize(parent)
@parent = parent
@parent_pipe = fork_callback(nil, &method(:handle_spawn))
end
private
def handle_spawn(*args, zctx, child_pipe)
@sender = Thread.current.to_s
@zloop = CZMQ::Zloop.new
#@zloop.set_verbose(true)
@worker = CZMQ::Zsocket.new(zctx, CZMQ::Zsocket::DEALER)
cpi = child_pipe.to_pollitem
cpi_err = child_pipe.to_pollitem(CZMQ::Zpoller::POLLERR)
@zloop.set_tolerant(cpi)
@zloop.add_poller(cpi, nil) do |zloop, pollitem, *args|
CZMQ::Zmsg.recv_nowait(pollitem[:socket]).send_zmsg(@worker)
0
end
@zloop.add_poller(cpi_err, nil) do |zloop, pollitem, *args|
zloop.destructor
0
end
wpi = @worker.to_pollitem
wpi_err = @worker.to_pollitem(CZMQ::Zpoller::POLLERR)
@zloop.set_tolerant(wpi)
@zloop.add_poller(wpi, nil) do |zloop, pollitem, *args|
msg = CZMQ::Zmsg.recv_nowait(pollitem[:socket]).to_a
delimiter = msg.index('')
identities = msg[0, delimiter]
payload = msg[delimiter +1..-1]
type, uuid, question = payload
request = ::ServerRequest.new(identities, uuid, MessagePack.load(question))
case type
when 'ask'
answer = @parent.ask(request).value
response = ::ServerResponse.new(identities, uuid, answer)
response.send_over(pollitem[:socket], type)
when 'tell'
@parent << request
end
0
end
@zloop.add_poller(wpi_err, nil) do |zloop, pollitem, *args|
@zloop.destructor
0
end
@worker.connect('inproc://backend')
child_pipe.signal
@zloop.start
@parent << :goodbye
Thread.current.exit
end
end
class ClientWorker < CZMQ::Zthread
attr_reader :parent_pipe, :sender
def initialize(parent, endpoint)
@parent = parent
@endpoint = endpoint
@parent_pipe = fork_callback(nil, &method(:handle_spawn))
end
private
def handle_spawn(*args, zctx, child_pipe)
@sender = Thread.current.to_s
@zloop = CZMQ::Zloop.new
#@zloop.set_verbose(true)
@worker = CZMQ::Zsocket.new(zctx, CZMQ::Zsocket::DEALER)
cpi = child_pipe.to_pollitem
cpi_err = child_pipe.to_pollitem(CZMQ::Zpoller::POLLERR)
@zloop.set_tolerant(cpi)
@zloop.add_poller(cpi, nil) do |zloop, pollitem, *args|
msg = CZMQ::Zmsg.recv_nowait(pollitem[:socket])
type, uuid, question = msg.to_a[1..-1]
if type == 'ask'
@uuid = uuid
end
msg.send_zmsg(@worker)
0
end
@zloop.add_poller(cpi_err, nil) do |zloop, pollitem, *args|
@zloop.destructor
0
end
wpi = @worker.to_pollitem
wpi_err = @worker.to_pollitem(CZMQ::Zpoller::POLLERR)
@zloop.set_tolerant(wpi)
@zloop.add_poller(wpi, nil) do |zloop, pollitem, *args|
msg = CZMQ::Zmsg.recv_nowait(pollitem[:socket]).to_a
delimiter = msg.index('')
identities = msg[0, delimiter]
payload = msg[delimiter + 1..-1]
type, uuid, answer = payload
case type
when 'ask'
if uuid == @uuid
@uuid = nil
CZMQ::Zstr.new(child_pipe).send_zstr(answer)
end
when 'tell'
@parent << ::ClientResponse.new(uuid, MessagePack.load(answer))
end
0
end
@zloop.add_poller(wpi_err, nil) do |zloop, pollitem, *args|
@zloop.destructor
0
end
@worker.connect(@endpoint)
child_pipe.signal
@zloop.start
@parent << :goodbye
Thread.current.exit
end
end
class Server < Concurrent::Actor::Context
include Concurrent::Logging
def initialize
@pipes = {}
if ((cpu_counter = (Concurrent.processor_count - 1))) > 0
cpu_counter.times do
spawn_ioactor
end
else
spawn_ioactor
end
end
def spawn_ioactor
worker = ServerWorker.new(ref)
timeout(5) {
worker.parent_pipe.wait
}
@pipes[worker.sender] = worker.parent_pipe
end
def on_message(msg)
case msg
when :goodbye
@pipes.delete(envelope.sender_path)
when ::ServerRequest
case msg[:question]
when 'hello'
if envelope.ivar
'welcome'
else
response = ::ServerResponse.new(msg[:identities], msg[:uuid], 'welcome')
response.send_over(@pipes[envelope.sender_path], 'tell')
end
end
end
end
end
class Client < Concurrent::Actor::Context
include Concurrent::Logging
def initialize(endpoint)
@endpoint = endpoint
spawn_ioactor
end
def on_message(msg)
case msg
when :goodbye
@worker_pipe = nil
when :endpoint
@endpoint
when ::ClientRequest
if envelope.ivar
msg.send_over(@worker_pipe, 'ask')
msg.get_answer_from(@worker_pipe)
else
msg.send_over(@worker_pipe, 'tell')
end
when ::ClientResponse
puts msg[:answer]
end
end
private
def spawn_ioactor
worker = ClientWorker.new(ref, @endpoint)
timeout(5) {
worker.parent_pipe.wait
}
@worker_pipe = worker.parent_pipe
end
end
backend = CZMQ::Zsocket.new(CZMQ.context, CZMQ::Zsocket::DEALER)
backend.bind('inproc://backend')
frontend = CZMQ::Zsocket.new(CZMQ.context, CZMQ::Zsocket::ROUTER)
port = frontend.bind('tcp://127.0.0.1:*')
proxy = CZMQ::Zproxy.new(CZMQ.context, frontend.to_zsocket, backend.to_zsocket)
server = Server.spawn(:server)
client = Client.spawn(:client, "tcp://localhost:#{port}")
puts client.ask(ClientRequest.new(CZMQ::Zuuid.new, "hello")).value
client << ClientRequest.new(CZMQ::Zuuid.new, "hello")
sleep
require 'thread'
require 'ffi'
module CZMQ
module Utils
extend FFI::Library
ffi_lib 'czmq', 'libzmq', FFI::Library::LIBC
attach_function :zstr_free, :zstr_free, [:pointer], :void, :blocking => true
attach_function :libc_free, :free, [:pointer], :void, :blocking => true
attach_function :zmq_version, :zmq_version, [:pointer, :pointer, :pointer], :void, :blocking => true
attach_function :czmq_version, :zsys_version, [:pointer, :pointer, :pointer], :void, :blocking => true
attach_function :errno, :zmq_errno, [], :int, :blocking => true
attach_function :strerror, :zmq_strerror, [:int], :string, :blocking => true
class << self
def version
unless @version
z_major = FFI::MemoryPointer.new :int
z_minor = FFI::MemoryPointer.new :int
z_patch = FFI::MemoryPointer.new :int
c_major = FFI::MemoryPointer.new :int
c_minor = FFI::MemoryPointer.new :int
c_patch = FFI::MemoryPointer.new :int
zmq_version z_major, z_minor, z_patch
czmq_version c_major, c_minor, c_patch
@version = {
zmq: {:major => z_major.read_int, :minor => z_minor.read_int, :patch => z_patch.read_int},
czmq: {:major => c_major.read_int, :minor => c_minor.read_int, :patch => c_patch.read_int}
}
end
@version
end
def check_for_pointer(ptr)
ptr.is_a?(FFI::Pointer) && !ptr.null?
end
def read_string(ffi_str)
if check_for_pointer(ffi_str)
ffi_str_pointer = FFI::MemoryPointer.new(:pointer)
ffi_str_pointer.write_pointer(ffi_str)
str = ffi_str.read_string
zstr_free(ffi_str_pointer)
str
else
fail IOError, error
end
end
def free(pointer)
if check_for_pointer(pointer)
libc_free(pointer)
else
fail ArgumentError, 'Can only free non NULL Pointers'
end
end
def error
strerror(errno)
end
end
if (version[:zmq][:major] < 4) ||(version[:czmq][:major] < 2)
fail LoadError, 'This needs at least zeromq 4 and czmq 2'
end
end
end
module LibCZMQ
def self.extended(klass)
klass.extend FFI::Library
klass.ffi_lib('czmq', 'libzmq')
end
def czmq_class
return @czmq_class if @czmq_class
@czmq_class = "#{self.singleton_class.inspect.split('::').last[0...-1].downcase}".to_sym
end
def czmq_constructor(constructor_params = [], constructor_extras = nil)
self.module_eval <<-RUBY, __FILE__, __LINE__ + 1
attach_function :constructor, "#{czmq_class}_new", #{constructor_params.inspect}, :pointer, :blocking => true
def self.new_from_czmq_obj(czmq_obj, owned_by_me = true)
if CZMQ::Utils.check_for_pointer(czmq_obj)
instance = allocate
instance.instance_variable_set(:@czmq_obj, czmq_obj)
instance.instance_variable_set(:@owned_by_me, owned_by_me)
instance.send :setup_finalizer
if instance.method(:initialize).parameters.size > 0
instance.send :initialize, czmq_obj
else
instance.send :initialize
end
instance
else
fail ArgumentError, "Didn't pass a valid Pointer"
end
end
def self.new(*args)
instance = allocate
if #{constructor_extras.nil?}
if #{constructor_params.empty?}
czmq_obj = instance.class.constructor
else
czmq_obj = instance.class.constructor(*args)
end
else
if args.first.respond_to?(:"to_#{constructor_extras}") &&
CZMQ::Utils.check_for_pointer(args.first.to_#{constructor_extras})
instance.instance_variable_set("@#{constructor_extras}", args.first)
else
fail ArgumentError, "Didn't pass a valid Pointer"
end
if #{constructor_params.length} == 1
czmq_obj = instance.class.constructor(args.first.to_czmq)
else
czmq_obj = instance.class.constructor(args.first.to_czmq, *args[1..-1])
end
end
!czmq_obj.null? ||fail("Cannot allocate #{czmq_class} because of #{CZMQ::Utils.error}")
instance.instance_variable_set(:@czmq_obj, czmq_obj)
instance.instance_variable_set(:@owned_by_me, true)
instance.send :setup_finalizer
if instance.method(:initialize).parameters.size > 0
instance.send :initialize, *args
else
instance.send :initialize
end
instance
end
def to_#{czmq_class}
@czmq_obj
end
def to_czmq
@czmq_obj
end
RUBY
end
def czmq_destructor(destructor_extras = nil, attach_destructor = true)
self.module_eval <<-RUBY, __FILE__, __LINE__ + 1
if #{attach_destructor}
if #{destructor_extras.nil?}
attach_function :destructor, "#{czmq_class}_destroy", [:pointer], :void, :blocking => true
else
attach_function :destructor, "#{czmq_class}_destroy", [:pointer, :pointer], :void, :blocking => true
end
end
def destructor
Thread.exclusive do
remove_finalizer
if @owned_by_me
unless CZMQ::Zctx.interrupted == 1
if #{destructor_extras.nil?}
if CZMQ::Utils.check_for_pointer(@czmq_obj)
FFI::MemoryPointer.new(:pointer) do |p|
p.write_pointer(@czmq_obj)
self.class.destructor(p)
end
end
else
begin
extras = instance_variable_get("@#{destructor_extras}").to_czmq
if CZMQ::Utils.check_for_pointer(@czmq_obj) &&
CZMQ::Utils.check_for_pointer(extras)
self.class.destructor(extras, @czmq_obj)
end
rescue NoMethodError
end
end
end
end
instance_variables.each do |var|
instance_variable_set(var, nil)
end
true
end
end
private
def setup_finalizer
ObjectSpace.define_finalizer(self, self.class.close_instance(self))
end
def remove_finalizer
ObjectSpace.undefine_finalizer self
end
def self.close_instance(selfie)
Proc.new do
selfie.destructor
end
end
RUBY
end
def czmq_function(name, function, arguments, returns)
fn = "#{czmq_class}_#{function}"
self.module_eval <<-RUBY, __FILE__, __LINE__ + 1
attach_function #{name.inspect}, #{fn.inspect}, #{arguments.inspect}, #{returns.inspect}, :blocking => true
def #{name}(*args)
if instance_variables.include?(:@czmq_obj) && @czmq_obj.nil?
fail "#{czmq_class} is not initialized"
end
if CZMQ::Utils.check_for_pointer(@czmq_obj)
case #{function.inspect}
when :send
z_obj = @owned_by_me ? self : dup
pointer = FFI::MemoryPointer.new(:pointer)
pointer.write_pointer(z_obj.to_czmq)
zsocket = CZMQ::Zsocket.convert(args.first)
result = self.class.#{name}(pointer, zsocket, *args[1..-1])
z_obj.instance_variable_set(:@owned_by_me, false)
z_obj.destructor
when :insert, :append, :prepend
if args.first.respond_to?(:to_czmq) &&
CZMQ::Utils.check_for_pointer(args.first.to_czmq)
if args.first.instance_variable_get(:@owned_by_me)
z_obj = args.first
else
z_obj = args.first.dup
end
czmq_obj = FFI::MemoryPointer.new(:pointer)
czmq_obj.write_pointer(z_obj.to_czmq)
self.class.#{name}(@czmq_obj, czmq_obj)
z_obj.instance_variable_set(:@owned_by_me, false)
z_obj.destructor
else
fail ArgumentError, "Didn't supply a valid Pointer"
end
else
result = self.class.#{name}(@czmq_obj, *args)
end
else
result = self.class.#{name}(*args)
end
case #{returns.inspect}
when :pointer
unless CZMQ::Zctx.interrupted == 1
!result.null? ||fail(CZMQ::Utils.error)
end
result
when :int
unless CZMQ::Zctx.interrupted == 1
if #{function.inspect} == :send
result != -1 ||fail(IOError, CZMQ::Utils.error)
else
result != -1 ||fail(CZMQ::Utils.error)
end
end
result
when :string, :bool, :char
result
when :void
true
else
result
end
end
RUBY
end
end
module CZMQ
class Zauth
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zauth.txt
ALLOW_ANY = '*'.freeze
extend ::LibCZMQ
czmq_constructor [:pointer], :zctx
czmq_destructor
czmq_function :allow, :allow, [:pointer, :string], :void
czmq_function :deny, :deny, [:pointer, :string], :void
czmq_function :configure_plain, :configure_plain, [:pointer, :string, :string], :void
czmq_function :configure_curve, :configure_curve, [:pointer, :string, :string], :void
czmq_function :set_verbose, :set_verbose, [:pointer, :bool], :void
end
end
module CZMQ
class Zcert
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zcert.txt
extend ::LibCZMQ
czmq_constructor
czmq_destructor
#czmq_function :new_from, :new_from, [:pointer, :pointer], :pointer
czmq_function :public_txt, :public_txt, [:pointer], :string
czmq_function :secret_txt, :secret_txt, [:pointer], :string
czmq_function :load_zcert, :load, [:string], :pointer
czmq_function :save, :save, [:pointer, :string], :int
czmq_function :save_public, :save_public, [:pointer, :string], :int
czmq_function :save_secret, :save_secret, [:pointer, :string], :int
czmq_function :apply, :apply, [:pointer, :pointer], :void
czmq_function :dup_zcert, :dup, [:pointer], :pointer
czmq_function :eq, :eq, [:pointer, :pointer], :bool
# def self.zcert_new_from(public_key, secret_key)
# if @zcert = super
# setup_finalizer
# else
# fail 'Cannot create certificate'
# end
# end
def self.convert(cert)
if Utils.check_for_pointer(cert)
return cert
elsif cert.respond_to?(:to_zcert) &&
Utils.check_for_pointer(cert.to_zcert)
return cert.to_zcert
else
fail ArgumentError, "#{cert.class} is not a CZMQ::Zcert"
end
end
def self.load(filename)
unless (zcert = load_zcert(filename)).null?
new_from_czmq_obj(zcert)
else
fail IOError, CZMQ::Utils.error
end
end
def dup
self.class.new_from_czmq_obj(dup_zcert)
end
def ==(other)
eq(self.class.convert(other))
end
end
end
module CZMQ
class Zcertstore
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zcertstore.txt
extend ::LibCZMQ
czmq_constructor [:string]
czmq_destructor
czmq_function :lookup, :lookup, [:pointer, :string], :pointer
czmq_function :insert_zcert, :insert, [:pointer, :pointer], :void
end
end
module CZMQ
class Zctx
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zctx.txt
extend ::LibCZMQ
czmq_constructor
czmq_destructor
czmq_function :shadow, :shadow, [:pointer], :pointer
czmq_function :set_iothreads, :set_iothreads, [:pointer, :int], :void
czmq_function :set_linger, :set_linger, [:pointer, :int], :void
czmq_function :set_pipehwm, :set_pipehwm, [:pointer, :int], :void
czmq_function :set_sndhwm, :set_sndhwm, [:pointer, :int], :void
czmq_function :set_rcvhwm, :set_rcvhwm, [:pointer, :int], :void
czmq_function :underlying, :underlying, [:pointer], :pointer
attach_variable :interrupted, :zctx_interrupted, :int
end
end
module CZMQ
class Zframe
include Comparable
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zframe.txt
MORE = 1
REUSE = 2
DONTWAIT = 4
extend ::LibCZMQ
czmq_constructor [:pointer, :size_t]
czmq_destructor
czmq_function :new_empty, :new_empty, [], :pointer
czmq_function :recv_zframe_nowait, :recv_nowait, [:pointer], :pointer
czmq_function :recv_zframe, :recv, [:pointer], :pointer
czmq_function :send_zframe, :send, [:pointer, :pointer, :int], :int
czmq_function :size, :size, [:pointer], :size_t
czmq_function :data, :data, [:pointer], :pointer
czmq_function :dup_zframe, :dup, [:pointer], :pointer
czmq_function :str_hex, :strhex, [:pointer], :string
czmq_function :str_eq, :streq, [:pointer, :string], :bool
czmq_function :str_dup, :strdup, [:pointer], :string
czmq_function :more, :more, [:pointer], :int
czmq_function :set_more, :set_more, [:pointer, :int], :void
def self.convert(frame)
if Utils.check_for_pointer(frame)
return frame
elsif frame.respond_to?(:to_zframe) &&
Utils.check_for_pointer(frame.to_zframe)
return frame.to_zframe
else
fail ArgumentError, "#{frame.class} is not a CZMQ::Zframe"
end
end
def dup
self.class.new_from_czmq_obj(dup_zframe)
end
def <=>(other)
size <=> other.size
end
def ==(other)
to_str == other.to_str
end
def to_str
[str_hex].pack('H*')
end
def self.recv(socket)
zsocket = Zsocket.convert(socket)
unless (zframe = recv_zframe(zsocket)).null?
new_from_czmq_obj(zframe)
else
fail IOError, CZMQ::Utils.error
end
end
def self.recv_nowait(socket)
zsocket = Zsocket.convert(socket)
unless (zframe = recv_zframe_nowait(zsocket)).null?
new_from_czmq_obj(zframe)
else
fail IOError, CZMQ::Utils.error
end
end
end
end
module CZMQ
class Zloop
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zloop.txt
extend ::LibCZMQ
czmq_constructor
czmq_destructor
czmq_function :poller, :poller, [:pointer, :pointer, :pointer, :pointer], :int
czmq_function :poller_end, :poller_end, [:pointer, :pointer], :void
czmq_function :set_tolerant, :set_tolerant, [:pointer, :pointer], :void
czmq_function :timer, :timer, [:pointer, :size_t, :size_t, :pointer, :pointer], :int
czmq_function :timer_end, :timer_end, [:pointer, :int], :int
czmq_function :set_verbose, :set_verbose, [:pointer, :bool], :void
czmq_function :start, :start, [:pointer], :int
def initialize
@poller_callbacks = []
@timer_callbacks = []
end
def add_poller(pollitem, *args, &block)
poller_callback = FFI::Function.new(:int, [:pointer, :pointer, :pointer], :blocking => true) do |zloopy, pollitem, *args|
zloop = CZMQ::Zloop.new_from_czmq_obj(zloopy, false)
zpollitem = Zpoller::PollItem.new(pollitem)
block.call(zloop, zpollitem, *args)
end
poller(pollitem, poller_callback, *args)
@poller_callbacks << {poller_callback: poller_callback, pollitem: pollitem}
end
def remove_poller(pollitem)
@poller_callbacks.delete_if {|poller| poller[:pollitem] = pollitem }
poller_end(pollitem)
end
def add_timer(delay, times, *args, &block)
timer_callback = FFI::Function.new(:int, [:pointer, :int, :pointer], :blocking => true) do |zloopy, timer_id, *args|
zloop = CZMQ::Zloop.new_from_czmq_obj(zloopy, false)
block.call(zloop, timer_id, *args)
end
timer_id = timer(delay, times, timer_callback, *args)
@timer_callbacks << {timer_callback: timer_callback, timer_id: timer_id}
timer_id
end
def remove_timer(timer_id)
@timer_callbacks.delete_if { |timer| timer[:timer_id] = timer_id }
timer_end(timer_id)
end
end
end
module CZMQ
class Zmsg
include Enumerable
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zmsg.txt
extend ::LibCZMQ
czmq_constructor
czmq_destructor
czmq_function :recv_zmsg, :recv, [:pointer], :pointer
czmq_function :recv_zmsg_nowait, :recv_nowait, [:pointer], :pointer
czmq_function :send_zmsg, :send, [:pointer, :pointer], :int
czmq_function :size, :size, [:pointer], :size_t
czmq_function :content_size, :content_size, [:pointer], :size_t
czmq_function :prepend_zframe, :prepend, [:pointer, :pointer], :int
czmq_function :append_zframe, :append, [:pointer, :pointer], :int
czmq_function :pop_zframe, :pop, [:pointer], :pointer
czmq_function :push_mem, :pushmem, [:pointer, :pointer, :size_t], :int
czmq_function :add_mem, :addmem, [:pointer, :pointer, :size_t], :int
czmq_function :push_zstr, :pushstr, [:pointer, :string], :int
czmq_function :add_zstr, :addstr, [:pointer, :string], :int
czmq_function :pop_zstr, :popstr, [:pointer], :string
czmq_function :unwrap_zframe, :unwrap, [:pointer], :pointer
czmq_function :remove_zframe, :remove, [:pointer, :pointer], :void
czmq_function :first_zframe, :first, [:pointer], :pointer
czmq_function :next_zframe, :next, [:pointer], :pointer
czmq_function :last_zframe, :last, [:pointer], :pointer
czmq_function :dup_zmsg, :dup, [:pointer], :pointer
def each
yield first
((size) -1).times do |i|
yield self.next
end
end
def to_a
ary = []
each {|frame| ary << frame.to_str }
ary
end
def add(data)
case data
when String
add_string(data)
when Zframe
append_zframe(data)
else
if (data.respond_to?(:data) && data.respond_to?(:size))
add_mem(data.data, data.size)
elsif data.respond_to?(:to_str)
add_string(data.to_str)
else
fail ArgumentError, 'Can only add Strings and CZMQ::Zframes'
end
end
end
alias_method :<<, :add
def push(data)
case data
when String
push_string(data)
when Zframe
prepend_zframe(data)
else
if (data.respond_to?(:data) && data.respond_to?(:size))
push_mem(data.data, data.size)
elsif data.respond_to?(:to_str)
push_string(data.to_str)
else
fail ArgumentError, 'Can only push Strings and CZMQ::Zframes'
end
end
end
[:first, :next, :last].each do |meth|
self.class_eval <<-RUBY, __FILE__, __LINE__ + 1
def #{meth.to_s}
CZMQ::Zframe.new_from_czmq_obj(#{meth.to_s}_zframe, false)
end
RUBY
end
[:pop, :unrwap].each do |meth|
self.class_eval <<-RUBY, __FILE__, __LINE__ + 1
def #{meth.to_s}
CZMQ::Zframe.new_from_czmq_obj(#{meth.to_s}_zframe)
end
RUBY
end
def dup
self.class.new_from_czmq_obj(dup_zmsg)
end
def remove(zframe)
remove_zframe(Zframe.convert(zframe))
end
def self.recv(socket)
zsocket = Zsocket.convert(socket)
unless (zmsg = recv_zmsg(zsocket)).null?
new_from_czmq_obj(zmsg)
else
fail IOError, CZMQ::Utils.error
end
end
def self.recv_nowait(socket)
zsocket = Zsocket.convert(socket)
unless (zmsg = recv_zmsg_nowait(zsocket)).null?
new_from_czmq_obj(zmsg)
else
fail IOError, CZMQ::Utils.error
end
end
private
def add_string(string)
if string.encoding == Encoding::ASCII_8BIT
add_mem(string, string.size)
else
add_zstr(string)
end
end
def push_string(string)
if string.encoding == Encoding::ASCII_8BIT
push_mem(string, string.size)
else
push_zstr(string)
end
end
end
end
module CZMQ
class Zmonitor
CONNECTED = 0x0001
CONNECT_DELAYED = 0x0002
CONNECT_RETRIED = 0x0004
LISTENING = 0x0008
BIND_FAILED = 0x0010
ACCEPTED = 0x0020
ACCEPT_FAILED = 0x0040
CLOSED = 0x0080
CLOSE_FAILED = 0x0100
DISCONNECTED = 0x0200
MONITOR_STOPPED = 0x0400
ALL = 0xFFFF
extend ::LibCZMQ
czmq_constructor [:pointer, :pointer, :int], :zctx
czmq_destructor
czmq_function :recv, :recv, [:pointer], :pointer
czmq_function :socket, :socket, [:pointer], :pointer
czmq_function :set_verbose, :set_verbose, [:pointer, :bool], :void
def to_pollitem(polling_type = Zpoller::POLLIN)
Zpoller.create_pollitem(socket: socket, events: polling_type)
end
end
end
module CZMQ
class Zpoller
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zpoller.txt
extend ::LibCZMQ
POLL = 1
POLLIN = 1
POLLOUT = 2
POLLERR = 4
class PollItem < FFI::Struct
FD_TYPE = if FFI::Platform::IS_WINDOWS && FFI::Platform::ADDRESS_SIZE == 64
# On Windows, zmq.h defines fd as a SOCKET, which is 64 bits on x64.
:uint64
else
:int
end
layout :socket, :pointer,
:fd, FD_TYPE,
:events, :short,
:revents, :short
def readable?
(self[:revents] & POLLIN) > 0
end
def writable?
(self[:revents] & POLLOUT) > 0
end
def error?
(self[:revents] & POLLERR) > 0
end
def inspect
"socket [#{self[:socket]}], fd [#{self[:fd]}], events [#{self[:events]}], revents [#{self[:revents]}]"
end
end
def self.create_pollitem(args={})
pi = PollItem.new
pi[:socket] = args[:socket]
pi[:fd] = args[:fd] || 0
pi[:events] = args[:events] || 0
pi[:revents] = args[:revents] || 0
pi
end
czmq_constructor [:varargs]
czmq_destructor
czmq_function :add, :add, [:pointer, :pointer], :int
czmq_function :wait, :wait, [:pointer, :int], :pointer
czmq_function :expired, :expired, [:pointer], :bool
czmq_function :terminated, :terminated, [:pointer], :bool
end
end
module CZMQ
class Zproxy
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zproxy.txt
extend ::LibCZMQ
czmq_constructor [:pointer, :pointer, :pointer], :zctx
czmq_destructor
end
end
module CZMQ
SET_SOCKOPT = /^set_(.+)$/.freeze
class Zsocket
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zsocket.txt
PAIR = 0
PUB = 1
SUB = 2
REQ = 3
REP = 4
XREQ = 5
XREP = 6
PULL = 7
PUSH = 8
XPUB = 9
XSUB = 10
DEALER = XREQ
ROUTER = XREP
STREAM = 11
extend ::LibCZMQ
czmq_constructor [:pointer, :int], :zctx
czmq_destructor :zctx
czmq_function :bind, :bind, [:pointer, :string], :int
czmq_function :unbind, :unbind, [:pointer, :string], :int
czmq_function :connect, :connect, [:pointer, :string], :int
czmq_function :disconnect, :disconnect, [:pointer, :string], :int
czmq_function :poll, :poll, [:pointer, :int], :bool
czmq_function :type_str, :type_str, [:pointer], :string
czmq_function :send_mem, :sendmem, [:pointer, :pointer, :size_t, :int], :int
czmq_function :signal, :signal, [:pointer], :int
czmq_function :wait, :wait, [:pointer], :int
def to_pollitem(polling_type = Zpoller::POLLIN)
Zpoller.create_pollitem(socket: @czmq_obj, events: polling_type)
end
def self.convert(socket)
if Utils.check_for_pointer(socket)
return socket
elsif socket.respond_to?(:to_zsocket) &&
Utils.check_for_pointer(socket.to_zsocket)
return socket.to_zsocket
else
fail ArgumentError, "#{socket.class} is not a CZMQ::Zsocket"
end
end
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zsockopt.txt
def method_missing(meth, *args, &blk)
if args.length == 1 &&
meth.to_s =~ SET_SOCKOPT &&
(args.first.is_a?(Integer) || args.first.is_a?(String))
begin
self.class.instance_eval <<-RUBY, __FILE__, __LINE__ +1
attach_function #{meth.inspect}, "zsocket_#{meth.to_s}", [:pointer, :varargs], :void
RUBY
rescue FFI::NotFoundError
super
else
if args.first.is_a?(Integer)
self.class.class_eval <<-RUBY, __FILE__, __LINE__ + 1
def #{meth.to_s}(arg)
self.class.#{meth.to_s}(@czmq_obj, :int, arg)
end
RUBY
elsif args.first.is_a?(String)
self.class.class_eval <<-RUBY, __FILE__, __LINE__ + 1
def #{meth.to_s}(arg)
self.class.#{meth.to_s}(@czmq_obj, :string, arg)
end
RUBY
end
send meth, args.first
end
else
super
end
end
end
end
module CZMQ
class Zstr
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zstr.txt
extend ::LibCZMQ
czmq_destructor nil, false
czmq_function :recv_zstr_nowait, :recv_nowait, [:pointer], :pointer
czmq_function :recv_zstr, :recv, [:pointer], :pointer
czmq_function :send_zstr, :send, [:pointer, :string], :int
czmq_function :sendm, :sendm, [:pointer, :string], :int
def initialize(socket)
@zsocket = Zsocket.convert(socket)
end
def recv
Utils.read_string(self.class.recv_zstr(@zsocket))
end
def recv_nowait
Utils.read_string(self.class.recv_zstr_nowait(@zsocket))
end
def send_zstr(str)
result = self.class.send_zstr(@zsocket, str)
result != -1 ||fail(IOError, CZMQ::Utils.error)
result
end
def sendm(str)
result = self.class.sendm(@zsocket, str)
result != -1 ||fail(IOError, CZMQ::Utils.error)
result
end
end
end
module CZMQ
class Zsys
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zsys.txt
extend ::LibCZMQ
czmq_function :handler_set, :handler_set, [:pointer], :void
czmq_function :handler_reset, :handler_reset, [], :void
czmq_function :set_interface, :set_interface, [:string], :void
czmq_function :interface, :interface, [], :string
czmq_function :socket_error, :socket_error, [:string], :void
end
end
module CZMQ
class Zthread
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zthread.txt
extend ::LibCZMQ
czmq_destructor nil, false
czmq_function :fork_zthread, :fork, [:pointer, :pointer, :pointer], :pointer
czmq_function :new_zthread, :new, [:pointer, :pointer], :int
def fork_zthread(zthread_attached_fn, *args)
self.class.fork_zthread(CZMQ.context.to_zctx, zthread_attached_fn, *args)
end
def fork_callback(*args, &block)
@fork_callback = FFI::Function.new(:void, [:pointer, :pointer, :pointer], :blocking => true) do |*args, zctxy, pipey|
zctx = CZMQ::Zctx.new_from_czmq_obj(zctxy)
pipe = CZMQ::Zsocket.new_from_czmq_obj(pipey)
block.call(*args, zctx, pipe)
end
CZMQ::Zsocket.new_from_czmq_obj(fork_zthread(@fork_callback, *args))
end
def new_callback(*args, &block)
@new_callback = FFI::Function.new(:void, [:pointer], :blocking => true) do |*args|
block.call(*args)
end
new_zthread(@new_callback, *args)
end
end
end
module CZMQ
class Zuuid
extend ::LibCZMQ
czmq_constructor
czmq_destructor
czmq_function :data, :data, [:pointer], :pointer
czmq_function :size, :size, [:pointer], :size_t
czmq_function :to_str, :str, [:pointer], :string
#czmq_function :set, :set, [:pointer, :pointer], :void
#czmq_function :export, :export, [:pointer, :pointer], :void
czmq_function :eq, :eq, [:pointer, :pointer], :bool
czmq_function :neq, :neq, [:pointer, :pointer], :bool
end
end
module CZMQ
class << self
def init
::Thread.exclusive do
return @context if @context
CZMQ::Zsys.handler_set(nil)
@context = CZMQ::Zctx.new
# this should be miliseconds
@context.set_linger(64)
@context
end
end
def context
fail 'you must initialize CZMQ by calling CZMQ.init' unless @context
@context
end
end
at_exit do
if @context
@context.destructor
@context = nil
end
::Thread.exclusive do
CZMQ::Zctx.interrupted = 1
CZMQ::Zsys.handler_reset
end
end
end
source 'https://rubygems.org'
gem 'pry'
platforms :ruby do
gem 'msgpack'
end
platforms :jruby do
gem 'msgpack-jruby', :require => 'msgpack'
end
gem 'ffi'
gem 'concurrent-ruby'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment