Last active

Embed URL

HTTPS clone URL

SSH clone URL

You can clone with HTTPS or SSH.

Download Gist

Reel Websocket Server using PG Listen/Notify for crude pubsub

View reel_ws_pg_example.rb

require 'rubygems'
require 'bundler/setup'
require 'reel'
require 'celluloid/io'
require 'pg'
 
module PGNotifications
def self.included(actor)
actor.send(:include, Celluloid::IO)
end
 
def actions
@actions ||= {}
end
 
def pg_connection
@pg_connection ||= PG.connect( dbname: 'ws_example' )
end
 
def notify(channel, value)
pg_connection.exec("NOTIFY #{channel}, '#{value}';")
end
 
def listen(channel, action)
actions[channel] = action
pg_connection.exec("LISTEN #{channel}")
end
 
def start_listening
info "Starting Listening"
 
@listening = true
 
wait_for_notify do |channel, pid, payload|
info "Received notification: #{[channel, pid, payload].inspect}"
send(actions[channel], channel, payload)
end
end
 
def stop_listening
@listening = false
end
 
def wait_for_notify(&block)
io = pg_connection.socket_io
 
while @listening do
Celluloid::IO.wait_readable(io) # blocks execution, but unblocks this actor
pg_connection.consume_input # fetch any input on this connection
 
while notification = pg_connection.notifies do
block.call(*[
notification[:relname], # channel
notification[:be_pid], # pid
notification[:extra] # payload
])
end
end
end
 
def unlisten(channel)
(@listening ||= {})[channel] = false
 
pg_connection.exec("UNLISTEN #{channel}")
end
end
 
class TimeServer
include Celluloid
include PGNotifications
include Celluloid::Logger
 
def initialize
async.run
end
 
def run
now = Time.now.to_f
sleep now.ceil - now + 0.001
 
every(1) { notify 'time_change', Time.now.inspect }
info 'Registered to send time_change'
 
every(10) { notify 'keepalive', Time.now.to_i }
info 'Registered to send keepalive'
end
end
 
class TimeClient
include PGNotifications
include Celluloid::Logger
 
KEEPALIVE_LIMIT = 30 # keep it short
 
def initialize(socket)
info "Streaming time changes to client"
@socket = socket
@limit = Time.now.to_i + KEEPALIVE_LIMIT
 
async.start_listening
async.listen('time_change', :notify_time_change)
async.listen('keepalive', :report_keepalive)
end
 
def notify_time_change(channel, new_time)
@socket << new_time
rescue Reel::SocketError
info "Time client disconnected"
unlisten(channel)
terminate
end
 
def report_keepalive(channel, timestamp)
info "Current keepalive timestamp: #{timestamp}"
# unlisten(channel) if timestamp.to_i > @limit
end
end
 
class WebServer < Reel::Server::HTTP
include Celluloid::Logger
 
def initialize(host = "127.0.0.1", port = 1234)
info "Time server example starting on #{host}:#{port}"
super(host, port, &method(:on_connection))
end
 
def on_connection(connection)
connection.each_request do |request|
if request.websocket?
info "Received a WebSocket connection"
handle_websocket request.websocket
else
handle_request request
end
end
end
 
def handle_request(request)
if request.url == "/"
return render_index(request)
end
 
info "404 Not Found: #{request.path}"
request.respond :not_found, "Not found"
end
 
def handle_websocket(socket)
if socket.url == "/timeinfo"
TimeClient.new(socket)
else
info "Received invalid WebSocket request for: #{socket.url}"
socket.close
end
end
 
def render_index(request)
info "200 OK: /"
request.respond :ok, <<-HTML
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Reel WebSockets time server example</title>
<style>
body {
font-family: "HelveticaNeue-Light", "Helvetica Neue Light", "Helvetica Neue", Helvetica, Arial, "Lucida Grande", sans-serif;
font-weight: 300;
text-align: center;
}
#content {
width: 800px;
margin: 0 auto;
background: #EEEEEE;
padding: 1em;
}
</style>
</head>
<script>
var SocketKlass = "MozWebSocket" in window ? MozWebSocket : WebSocket;
var ws = new SocketKlass('ws://' + window.location.host + '/timeinfo');
ws.onmessage = function(msg){
document.getElementById('current-time').innerHTML = msg.data;
}
</script>
<body>
<div id="content">
<h1>Time Server Example</h1>
<div>The time is now: <span id="current-time">...</span></div>
</div>
</body>
</html>
HTML
end
end
 
TimeServer.supervise_as :time_server
WebServer.supervise_as :reel
 
sleep
Owner
tpitale commented

Works now with the latest reel at this time (0.5.0).

Seems to only be limited by the available number of postgresql connections.

Owner
tpitale commented

Revision 14eeea263bb0320c2cedaee36ec6d792e9b1e492 now supports multiple listens per actor.

I've recreated the wait_for_notify from the pg gem and enhanced it to work with Celluloid::IO.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.