public
Last active

Reel Websocket Server using PG Listen/Notify for crude pubsub

  • Download Gist
reel_ws_pg_example.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
require 'rubygems'
require 'bundler/setup'
require 'reel'
require 'pg'
 
module PGNotifications
def pg_connection
@pg_connection ||= PG.connect( dbname: 'ws_example' )
end
 
def notify(channel, value)
pg_connection.exec("NOTIFY #{channel}, '#{value}';")
end
 
def listen(channel, action)
(@listening ||= {})[channel] = true
 
pg_connection.exec("LISTEN #{channel}")
 
while @listening[channel] do
pg_connection.exec("SELECT null;") # fetch the latest, if any
 
while notification = pg_connection.notifies do # loop until notifies returns nil
info "Received notification: #{notification.inspect}"
send(action, channel, notification[:extra])
end
end
end
 
def unlisten(channel)
(@listening ||= {})[channel] = false
 
pg_connection.exec("UNLISTEN #{channel}")
end
end
 
class TimeServer
include Celluloid
# include Celluloid::Notifications
include PGNotifications
 
def initialize
run!
end
 
def run
now = Time.now.to_f
sleep now.ceil - now + 0.001
 
every(1) { notify 'time_change', Time.now.inspect }
end
end
 
class TimeClient
include Celluloid
# include Celluloid::Notifications
include PGNotifications
include Celluloid::Logger
 
def initialize(websocket)
info "Streaming time changes to client"
@socket = websocket
listen('time_change', :notify_time_change)
end
 
def notify_time_change(channel, new_time)
@socket << new_time
rescue Reel::SocketError
info "Time client disconnected"
unlisten(channel)
terminate
end
end
 
class WebServer < Reel::Server
include Celluloid::Logger
 
def initialize(host = "127.0.0.1", port = 1234)
info "Time server example starting on #{host}:#{port}"
super(host, port, &method(:on_connection))
end
 
def on_connection(connection)
while request = connection.request
case request
when Reel::Request
route_request connection, request
when Reel::WebSocket
info "Received a WebSocket connection"
route_websocket request
end
end
end
 
def route_request(connection, request)
if request.url == "/"
return render_index(connection)
end
 
info "404 Not Found: #{request.path}"
connection.respond :not_found, "Not found"
end
 
def route_websocket(socket)
if socket.url == "/timeinfo"
TimeClient.new(socket)
else
info "Received invalid WebSocket request for: #{socket.url}"
socket.close
end
end
 
def render_index(connection)
info "200 OK: /"
connection.respond :ok, <<-HTML
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Reel WebSockets time server example</title>
<style>
body {
font-family: "HelveticaNeue-Light", "Helvetica Neue Light", "Helvetica Neue", Helvetica, Arial, "Lucida Grande", sans-serif;
font-weight: 300;
text-align: center;
}
 
#content {
width: 800px;
margin: 0 auto;
background: #EEEEEE;
padding: 1em;
}
</style>
</head>
<script>
var SocketKlass = "MozWebSocket" in window ? MozWebSocket : WebSocket;
var ws = new SocketKlass('ws://' + window.location.host + '/timeinfo');
ws.onmessage = function(msg){
document.getElementById('current-time').innerHTML = msg.data;
}
</script>
<body>
<div id="content">
<h1>Time Server Example</h1>
<div>The time is now: <span id="current-time">...</span></div>
</div>
</body>
</html>
HTML
end
end
 
TimeServer.supervise_as :time_server
WebServer.supervise_as :reel
 
sleep

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.