Skip to content

Instantly share code, notes, and snippets.

@tpitale
Last active March 23, 2017 13:03
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tpitale/3915671 to your computer and use it in GitHub Desktop.
Save tpitale/3915671 to your computer and use it in GitHub Desktop.
Reel Websocket Server using PG Listen/Notify for crude pubsub
require 'rubygems'
require 'bundler/setup'
require 'reel'
require 'celluloid/io'
require 'pg'
module PGNotifications
def self.included(actor)
actor.send(:include, Celluloid::IO)
end
def actions
@actions ||= {}
end
def pg_connection
@pg_connection ||= PG.connect( dbname: 'ws_example' )
end
def notify(channel, value)
pg_connection.exec("NOTIFY #{channel}, '#{value}';")
end
def listen(channel, action)
actions[channel] = action
pg_connection.exec("LISTEN #{channel}")
end
def start_listening
info "Starting Listening"
@listening = true
wait_for_notify do |channel, pid, payload|
info "Received notification: #{[channel, pid, payload].inspect}"
send(actions[channel], channel, payload)
end
end
def stop_listening
@listening = false
end
def wait_for_notify(&block)
io = pg_connection.socket_io
while @listening do
Celluloid::IO.wait_readable(io) # blocks execution, but unblocks this actor
pg_connection.consume_input # fetch any input on this connection
while notification = pg_connection.notifies do
block.call(*[
notification[:relname], # channel
notification[:be_pid], # pid
notification[:extra] # payload
])
end
end
end
def unlisten(channel)
(@listening ||= {})[channel] = false
pg_connection.exec("UNLISTEN #{channel}")
end
end
class TimeServer
include Celluloid
include PGNotifications
include Celluloid::Logger
def initialize
async.run
end
def run
now = Time.now.to_f
sleep now.ceil - now + 0.001
every(1) { notify 'time_change', Time.now.inspect }
info 'Registered to send time_change'
every(10) { notify 'keepalive', Time.now.to_i }
info 'Registered to send keepalive'
end
end
class TimeClient
include PGNotifications
include Celluloid::Logger
KEEPALIVE_LIMIT = 30 # keep it short
def initialize(socket)
info "Streaming time changes to client"
@socket = socket
@limit = Time.now.to_i + KEEPALIVE_LIMIT
async.start_listening
async.listen('time_change', :notify_time_change)
async.listen('keepalive', :report_keepalive)
end
def notify_time_change(channel, new_time)
@socket << new_time
rescue Reel::SocketError
info "Time client disconnected"
unlisten(channel)
terminate
end
def report_keepalive(channel, timestamp)
info "Current keepalive timestamp: #{timestamp}"
# unlisten(channel) if timestamp.to_i > @limit
end
end
class WebServer < Reel::Server::HTTP
include Celluloid::Logger
def initialize(host = "127.0.0.1", port = 1234)
info "Time server example starting on #{host}:#{port}"
super(host, port, &method(:on_connection))
end
def on_connection(connection)
connection.each_request do |request|
if request.websocket?
info "Received a WebSocket connection"
handle_websocket request.websocket
else
handle_request request
end
end
end
def handle_request(request)
if request.url == "/"
return render_index(request)
end
info "404 Not Found: #{request.path}"
request.respond :not_found, "Not found"
end
def handle_websocket(socket)
if socket.url == "/timeinfo"
TimeClient.new(socket)
else
info "Received invalid WebSocket request for: #{socket.url}"
socket.close
end
end
def render_index(request)
info "200 OK: /"
request.respond :ok, <<-HTML
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Reel WebSockets time server example</title>
<style>
body {
font-family: "HelveticaNeue-Light", "Helvetica Neue Light", "Helvetica Neue", Helvetica, Arial, "Lucida Grande", sans-serif;
font-weight: 300;
text-align: center;
}
#content {
width: 800px;
margin: 0 auto;
background: #EEEEEE;
padding: 1em;
}
</style>
</head>
<script>
var SocketKlass = "MozWebSocket" in window ? MozWebSocket : WebSocket;
var ws = new SocketKlass('ws://' + window.location.host + '/timeinfo');
ws.onmessage = function(msg){
document.getElementById('current-time').innerHTML = msg.data;
}
</script>
<body>
<div id="content">
<h1>Time Server Example</h1>
<div>The time is now: <span id="current-time">...</span></div>
</div>
</body>
</html>
HTML
end
end
TimeServer.supervise_as :time_server
WebServer.supervise_as :reel
sleep
@pboling
Copy link

pboling commented Oct 6, 2015

@tpitale The unlisten breaks for me because of @listening = true with:

E, [2015-10-06T11:28:21.122062 #3373] ERROR -- : Actor crashed!
NoMethodError: undefined method `[]=' for true:TrueClass

@pboling
Copy link

pboling commented Oct 6, 2015

@tpitale For now I decided to just stop_listening altogether rather than try to manage by channel. I have created a gem based on your work here (with attribution, let me know if you'd like a different kind of mention, or if you'd like to be a collaborator on the project).

https://github.com/pboling/celluloid-io-pg-listener

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment