Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Reel Websocket Server using PG Listen/Notify for crude pubsub
require 'rubygems'
require 'bundler/setup'
require 'reel'
require 'celluloid/io'
require 'pg'
module PGNotifications
def self.included(actor)
actor.send(:include, Celluloid::IO)
end
def actions
@actions ||= {}
end
def pg_connection
@pg_connection ||= PG.connect( dbname: 'ws_example' )
end
def notify(channel, value)
pg_connection.exec("NOTIFY #{channel}, '#{value}';")
end
def listen(channel, action)
actions[channel] = action
pg_connection.exec("LISTEN #{channel}")
end
def start_listening
info "Starting Listening"
@listening = true
wait_for_notify do |channel, pid, payload|
info "Received notification: #{[channel, pid, payload].inspect}"
send(actions[channel], channel, payload)
end
end
def stop_listening
@listening = false
end
def wait_for_notify(&block)
io = pg_connection.socket_io
while @listening do
Celluloid::IO.wait_readable(io) # blocks execution, but unblocks this actor
pg_connection.consume_input # fetch any input on this connection
while notification = pg_connection.notifies do
block.call(*[
notification[:relname], # channel
notification[:be_pid], # pid
notification[:extra] # payload
])
end
end
end
def unlisten(channel)
(@listening ||= {})[channel] = false
pg_connection.exec("UNLISTEN #{channel}")
end
end
class TimeServer
include Celluloid
include PGNotifications
include Celluloid::Logger
def initialize
async.run
end
def run
now = Time.now.to_f
sleep now.ceil - now + 0.001
every(1) { notify 'time_change', Time.now.inspect }
info 'Registered to send time_change'
every(10) { notify 'keepalive', Time.now.to_i }
info 'Registered to send keepalive'
end
end
class TimeClient
include PGNotifications
include Celluloid::Logger
KEEPALIVE_LIMIT = 30 # keep it short
def initialize(socket)
info "Streaming time changes to client"
@socket = socket
@limit = Time.now.to_i + KEEPALIVE_LIMIT
async.start_listening
async.listen('time_change', :notify_time_change)
async.listen('keepalive', :report_keepalive)
end
def notify_time_change(channel, new_time)
@socket << new_time
rescue Reel::SocketError
info "Time client disconnected"
unlisten(channel)
terminate
end
def report_keepalive(channel, timestamp)
info "Current keepalive timestamp: #{timestamp}"
# unlisten(channel) if timestamp.to_i > @limit
end
end
class WebServer < Reel::Server::HTTP
include Celluloid::Logger
def initialize(host = "127.0.0.1", port = 1234)
info "Time server example starting on #{host}:#{port}"
super(host, port, &method(:on_connection))
end
def on_connection(connection)
connection.each_request do |request|
if request.websocket?
info "Received a WebSocket connection"
handle_websocket request.websocket
else
handle_request request
end
end
end
def handle_request(request)
if request.url == "/"
return render_index(request)
end
info "404 Not Found: #{request.path}"
request.respond :not_found, "Not found"
end
def handle_websocket(socket)
if socket.url == "/timeinfo"
TimeClient.new(socket)
else
info "Received invalid WebSocket request for: #{socket.url}"
socket.close
end
end
def render_index(request)
info "200 OK: /"
request.respond :ok, <<-HTML
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Reel WebSockets time server example</title>
<style>
body {
font-family: "HelveticaNeue-Light", "Helvetica Neue Light", "Helvetica Neue", Helvetica, Arial, "Lucida Grande", sans-serif;
font-weight: 300;
text-align: center;
}
#content {
width: 800px;
margin: 0 auto;
background: #EEEEEE;
padding: 1em;
}
</style>
</head>
<script>
var SocketKlass = "MozWebSocket" in window ? MozWebSocket : WebSocket;
var ws = new SocketKlass('ws://' + window.location.host + '/timeinfo');
ws.onmessage = function(msg){
document.getElementById('current-time').innerHTML = msg.data;
}
</script>
<body>
<div id="content">
<h1>Time Server Example</h1>
<div>The time is now: <span id="current-time">...</span></div>
</div>
</body>
</html>
HTML
end
end
TimeServer.supervise_as :time_server
WebServer.supervise_as :reel
sleep
Owner

tpitale commented May 24, 2014

Works now with the latest reel at this time (0.5.0).

Seems to only be limited by the available number of postgresql connections.

Owner

tpitale commented May 25, 2014

Revision 14eeea263bb0320c2cedaee36ec6d792e9b1e492 now supports multiple listens per actor.

I've recreated the wait_for_notify from the pg gem and enhanced it to work with Celluloid::IO.

pboling commented Oct 6, 2015

@tpitale The unlisten breaks for me because of @listening = true with:

E, [2015-10-06T11:28:21.122062 #3373] ERROR -- : Actor crashed!
NoMethodError: undefined method `[]=' for true:TrueClass

pboling commented Oct 6, 2015

@tpitale For now I decided to just stop_listening altogether rather than try to manage by channel. I have created a gem based on your work here (with attribution, let me know if you'd like a different kind of mention, or if you'd like to be a collaborator on the project).

https://github.com/pboling/celluloid-io-pg-listener

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment