Skip to content

Instantly share code, notes, and snippets.

@vhodges
Created October 26, 2011 14:20
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save vhodges/1316498 to your computer and use it in GitHub Desktop.
Save vhodges/1316498 to your computer and use it in GitHub Desktop.
Code to listen to redis pubsub channel for events and send them to connected clients
source 'http://rubygems.org'
gem "hiredis", "~> 0.3.1"
gem "em-synchrony"
gem 'em-hiredis'
#gem "redis", "~> 2.2.0", :require => ["redis/connection/synchrony", "redis"]
gem "goliath"
#gem 'em-synchrony', :git => 'git://github.com/igrigorik/em-synchrony.git'
<!DOCTYPE html>
<html>
<body>
<h3>Hello SSE!</h3>
<script>
function getQuerystring(key, default_)
{
if (default_==null) default_="";
key = key.replace(/[\[]/,"\\\[").replace(/[\]]/,"\\\]");
var regex = new RegExp("[\\?&]"+key+"=([^&#]*)");
var qs = regex.exec(window.location.href);
if(qs == null)
return default_;
else
return qs[1];
}
var channel = getQuerystring('channel') || "foo";
var source = new EventSource('/events?channel=' + channel);
// new connection opened callback
source.addEventListener('open', function(e) {
console.log('connection opened');
}, false);
// subscribe to unnamed messages
source.onmessage = function(e) {
console.log(e);
document.body.innerHTML += e.data + '<br />';
alert(e.data);
};
// connection closed callback
source.addEventListener('error', function(e) {
if (e.eventPhase == EventSource.CLOSED) {
console.log('connection closed');
}
}, false);
</script>
</body>
</html>
#
# This is now a working example!
#
require 'rubygems'
require 'em-hiredis'
require 'goliath'
class Wizard < Goliath::API
use Rack::Static, :urls => ["/index.html"], :root => Goliath::Application.app_path("public")
use Goliath::Rack::Params
attr_accessor :redis
def response(env)
path = env[Goliath::Request::REQUEST_PATH]
return [404, {}, "Not found"] unless path == "/events"
channel = env.params[:channel]
return [404, {}, "Channel required"] if channel.nil? || channel == ""
env.logger.info "Connecting to channel '#{channel}' for updates"
env['redis'] = EM::Hiredis.connect
env['redis'].subscribe(channel)
env['redis'].on(:message) do |chn, msg|
env.logger.info "Message received from channel #{chn}: #{msg} (We're looking for #{channel})"
if chn === channel
res = env.stream_send("data:#{msg}\n\n")
end
end
streaming_response(200, {'Content-Type' => 'text/event-stream'})
end
def on_close(env)
path = env[Goliath::Request::REQUEST_PATH]
return unless path == "/events"
channel = env.params[:channel]
return if channel.nil? || channel == ""
env.logger.info "Connection closed, Unsubscribing."
env['redis'].unsubscribe(channel)
env['redis'].close
end
end
@elsurudo
Copy link

Trying to run this, but running into an issue.

When I load index.html, I get this error:

[14029:INFO] 2013-06-26 21:51:01 :: Status: 200, Content-Length: 1074, Response Time: 19.60ms
[14029:INFO] 2013-06-26 21:51:01 :: Connecting to channel 'foo' for updates
[14029:ERROR] 2013-06-26 21:51:01 :: Use pubsub client
... long stack trace ...

Any ideas?

I do have redis installed.

@evotopid
Copy link

evotopid commented Feb 8, 2014

I'm probably "a bit" late, but eventually this can help someone stumbling upon this in the next time so I will share it anyway. The reason for the error is that em-hiredis has been updated.

This is from their readme:
The regular EM::Hiredis::Client no longer understands pubsub messages - this logic has been moved to EM::Hiredis::PubsubClient. The pubsub client can either be initialized directly (see code) or you can get one connected to the same redis server by calling #pubsub on an existing EM::Hiredis::Client instance.

Hope this helps somebody.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment