Skip to content

Instantly share code, notes, and snippets.

@kballenegger
Created June 7, 2012 03:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kballenegger/f2990e73b914ec46d696 to your computer and use it in GitHub Desktop.
Save kballenegger/f2990e73b914ec46d696 to your computer and use it in GitHub Desktop.
require 'eventmachine'
require 'em-websocket'
require 'json'
# require 'zmq'
require 'em-zeromq'
module Realtime
# constants
MessageBufferSize = 100
Zmqctx = ZMQ::Context.new(1)
# zmq
class ZMQMessageHandler
def initialize(buffer)
@buffer = buffer
end
def on_readable(socket, parts)
# I don't support multi-part atm
puts "reading??"
@buffer.read(parts[0])
end
end
class MessageBuffer
def initialize
@metrics = {}
@subscriptions = {}
end
def read data
puts "xmq data #{data}"
payload = JSON.parse(data)
return unless payload['metric']
metric = payload['metric']
@metrics[metric] ||= []
@metrics[metric] << payload
@metrics[metric].shift if @metrics[metric].count > MessageBufferSize
# notify subscribers
@subscriptions.each do |_, metrics|
puts "checking metrics #{metrics[metric]}"
metrics[metric].call if metrics[metric]
end
rescue JSON::JSONError
end
def get(metric)
@metrics[metric]
end
def subscribe(subscriber, metric, &block)
puts "subscribing #{subscriber} to metric #{metric}"
@subscriptions[subscriber] ||= {}
@subscriptions[subscriber][metric] = block
end
# no second argument means the entire subscriber gets unsubscribed
def unsubscribe(subscriber, metric=nil)
return unless @subscriptions[subscriber]
if metric
@subscriptions[subscriber].delete metric
else
@subscriptions.delete subscriber
end
end
end
MessageBufferInstance = MessageBuffer.new
# web sockets
class Connection
def initialize socket
@socket = socket
end
def opened
puts "-- someone connected to the echo server!"
end
def received(data)
payload = JSON.parse(data)
if payload['action'] == 'subscribe' && payload['metric']
MessageBufferInstance.subscribe(self, payload['metric']) do |payload|
puts "hello"
@socket.send(payload)
puts "world"
end
elsif payload['action'] == 'unsubscribe' && payload['metric']
MessageBufferInstance.unsubscribe(self, payload['metric'])
end
rescue JSON::JSONError
end
def closed
puts "-- someone disconnected from the echo server!"
end
end
end
ctx = EM::ZeroMQ::Context.new(1)
EM.run do
pull_socket = ctx.socket(ZMQ::PULL, Realtime::ZMQMessageHandler.new(Realtime::MessageBufferInstance))
pull_socket.connect('tcp://127.0.0.1:13002')
EventMachine::WebSocket.start(:host => "0.0.0.0", :port => 7331) do |ws|
connection = Realtime::Connection.new(ws)
ws.onopen { connection.opened }
ws.onmessage { |data| connection.received data }
ws.onclose { connection.closed }
end
end
# TODO: this is a nasty hack. not okay
at_exit do
Process.exit! # force kill stupid zmq
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment