Skip to content

Instantly share code, notes, and snippets.

@STRd6
Created October 7, 2015 23:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save STRd6/2ee6dcf7f3b0c709532a to your computer and use it in GitHub Desktop.
Save STRd6/2ee6dcf7f3b0c709532a to your computer and use it in GitHub Desktop.
require 'faye/websocket'
require "sinatra/activerecord"
class Pubsub
KEEPALIVE_TIME = 15 # in seconds
def initialize(app)
@app = app
@clients = []
@channel = "test"
Thread.new do
ActiveRecord::Base.connection_pool.with_connection do |connection|
connection.execute "LISTEN #{@channel}"
begin
loop do
connection.raw_connection.wait_for_notify(0.5) do |channel, pid, payload|
@clients.each do |ws|
ws.send(payload)
end
end
end
ensure
connection.execute "UNLISTEN *"
end
end
end
end
def notify(payload)
puts "Notify: #{payload}"
ActiveRecord::Base.connection_pool.with_connection do |connection|
sql = "NOTIFY #{@channel}, #{connection.quote(payload)}"
puts sql
connection.execute sql
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
# WebSockets logic goes here
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
ws.on :open do |event|
p [:open, ws.object_id]
@clients << ws
end
ws.on :message do |event|
p [:message, event.data]
self.notify event.data
end
ws.on :close do |event|
p [:close, ws.object_id, event.code, event.reason]
@clients.delete(ws)
ws = nil
end
# Return async Rack response
ws.rack_response
else
@app.call(env)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment