-
-
Save kballenegger/f2990e73b914ec46d696 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'eventmachine' | |
require 'em-websocket' | |
require 'json' | |
# require 'zmq' | |
require 'em-zeromq' | |
module Realtime | |
# constants | |
MessageBufferSize = 100 | |
Zmqctx = ZMQ::Context.new(1) | |
# zmq | |
class ZMQMessageHandler | |
def initialize(buffer) | |
@buffer = buffer | |
end | |
def on_readable(socket, parts) | |
# I don't support multi-part atm | |
puts "reading??" | |
@buffer.read(parts[0]) | |
end | |
end | |
class MessageBuffer | |
def initialize | |
@metrics = {} | |
@subscriptions = {} | |
end | |
def read data | |
puts "xmq data #{data}" | |
payload = JSON.parse(data) | |
return unless payload['metric'] | |
metric = payload['metric'] | |
@metrics[metric] ||= [] | |
@metrics[metric] << payload | |
@metrics[metric].shift if @metrics[metric].count > MessageBufferSize | |
# notify subscribers | |
@subscriptions.each do |_, metrics| | |
puts "checking metrics #{metrics[metric]}" | |
metrics[metric].call if metrics[metric] | |
end | |
rescue JSON::JSONError | |
end | |
def get(metric) | |
@metrics[metric] | |
end | |
def subscribe(subscriber, metric, &block) | |
puts "subscribing #{subscriber} to metric #{metric}" | |
@subscriptions[subscriber] ||= {} | |
@subscriptions[subscriber][metric] = block | |
end | |
# no second argument means the entire subscriber gets unsubscribed | |
def unsubscribe(subscriber, metric=nil) | |
return unless @subscriptions[subscriber] | |
if metric | |
@subscriptions[subscriber].delete metric | |
else | |
@subscriptions.delete subscriber | |
end | |
end | |
end | |
MessageBufferInstance = MessageBuffer.new | |
# web sockets | |
class Connection | |
def initialize socket | |
@socket = socket | |
end | |
def opened | |
puts "-- someone connected to the echo server!" | |
end | |
def received(data) | |
payload = JSON.parse(data) | |
if payload['action'] == 'subscribe' && payload['metric'] | |
MessageBufferInstance.subscribe(self, payload['metric']) do |payload| | |
puts "hello" | |
@socket.send(payload) | |
puts "world" | |
end | |
elsif payload['action'] == 'unsubscribe' && payload['metric'] | |
MessageBufferInstance.unsubscribe(self, payload['metric']) | |
end | |
rescue JSON::JSONError | |
end | |
def closed | |
puts "-- someone disconnected from the echo server!" | |
end | |
end | |
end | |
ctx = EM::ZeroMQ::Context.new(1) | |
EM.run do | |
pull_socket = ctx.socket(ZMQ::PULL, Realtime::ZMQMessageHandler.new(Realtime::MessageBufferInstance)) | |
pull_socket.connect('tcp://127.0.0.1:13002') | |
EventMachine::WebSocket.start(:host => "0.0.0.0", :port => 7331) do |ws| | |
connection = Realtime::Connection.new(ws) | |
ws.onopen { connection.opened } | |
ws.onmessage { |data| connection.received data } | |
ws.onclose { connection.closed } | |
end | |
end | |
# TODO: this is a nasty hack. not okay | |
at_exit do | |
Process.exit! # force kill stupid zmq | |
end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment