Skip to content

Instantly share code, notes, and snippets.

@maccman
Created July 16, 2010 03:08
Show Gist options
  • Save maccman/477873 to your computer and use it in GitHub Desktop.
Save maccman/477873 to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby
require File.expand_path('../config/environment', __FILE__)
require 'cramp'
require 'cramp/controller'
Cramp::Controller::Websocket.backend = :thin
class SyncroLongPollSubscribe < Cramp::Controller::Action
on_start :post_init
on_finish :unbind
def before_start
@client = ::Syncro::Client.new
@session = ::Syncro::Session.new(self, @client)
continue
end
def respond_with
[200, {
'Content-Type' => 'application/x-json-event-stream',
'Access-Control-Allow-Origin' => '*',
'Access-Control-Allow-Methods' => 'POST, GET',
'Access-Control-Allow-Headers' => 'X-Session-ID',
'X-Session-ID' => @session.id.to_s,
'Pragma' => @session.id.to_s,
'Connection' => 'Keep-Alive'
}]
end
def post_init
# Send buffer for Safari
send_message(" " * 40);
end
def send_message(data)
render("#{data.size}\n#{data}")
end
def unbind
@session.disconnect
end
end
class SyncroLongPollPublish < Cramp::Controller::Action
on_start :post_init
def respond_with
[200, {
'Content-Type' => 'text/html',
'Access-Control-Allow-Origin' => '*',
'Access-Control-Allow-Methods' => 'POST',
'Access-Control-Allow-Headers' => 'X-Session-ID, Content-Type'
}]
end
def post_init
if session_id = @env["HTTP_X_SESSION_ID"]
@session = ::Syncro::Session.find(session_id.to_i)
end
if @session
@session.receive_message(request.body.read)
end
finish
rescue => e
puts "#{e}\n\t" + e.backtrace.join("\n\t")
end
end
Thin::Logging.trace = true
// WebSocket support
//map "/" do
// use Rack::CommonLogger
// run SyncroWebSocket
//end
// Comet support
map "/lp/subscribe" do
use Rack::CommonLogger
run SyncroLongPollSubscribe
end
map "/lp/publish" do
use Rack::CommonLogger
run SyncroLongPollPublish
end
if (typeof WebSocket == "undefined")
(function(){
var MessageBuffer = function(){
this.clear();
this.temp = "";
};
MessageBuffer.fn = MessageBuffer.prototype;
MessageBuffer.fn.add = function(data){
this.string += data;
this.temp += data;
};
MessageBuffer.fn.read = function(n){
var result = this.string.substr(this.pos, n);
this.pos += n;
return result;
};
MessageBuffer.fn.readInt = function(){
var numInd = this.string.indexOf("\n");
if (numInd == -1) return -1;
var num = Number(this.read(numInd));
this.read(1) // The "\n"
return(num);
};
MessageBuffer.fn.clear = function(){
this.string = "";
this.rewind();
};
MessageBuffer.fn.trim = function(){
this.string = this.string.substr(this.pos);
this.rewind();
};
MessageBuffer.fn.rewind = function(){
this.pos = 0;
};
MessageBuffer.fn.eof = function(){
return(this.pos >= this.string.length);
};
MessageBuffer.fn.messages = function(){
var messages = [];
while ( !this.eof() ) {
var length = this.readInt();
if (length == -1) break; // Number is incomplete
var msg = this.read(length);
if (msg.length != length) break; // Message is incomplete
msg = msg.replace(/\s*$/, "");
if (msg != "") // Message is blank (just buffer)
messages.push(msg);
this.trim();
}
this.rewind();
return messages;
};
var WebSocket = function(address){
this.callbacks = {};
this.connected = false;
this.buffer = new MessageBuffer;
this.connect(address);
};
WebSocket.fn = WebSocket.prototype;
// Private
WebSocket.fn.receiveMessage = function(msg){
this.onmessage({data:msg});
};
WebSocket.fn.receiveMessages = function(data){
this.buffer.add(data);
var messages = this.buffer.messages();
for (var i in messages)
this.receiveMessage(messages[i]);
};
WebSocket.fn.rsc = function(e){
switch (this.subscribe.readyState) {
case 2:
// Pragma is one of the only headers we can access
this.sessionID = this.subscribe.getResponseHeader("Pragma");
this.connected = true;
this.onopen();
break;
case 3:
var data = this.subscribe.responseText;
var dataLength = data.length;
if (!this.lastRead)
this.lastRead = 0;
data = data.substr(this.lastRead, dataLength);
this.lastRead = dataLength;
this.receiveMessages(data);
break;
case 4:
this.subscribe.abort();
this.onclose();
break;
}
};
// Public
WebSocket.fn.connect = function(address){
if (!address) throw("Invalid address: " + address);
this.address = address.replace("ws://", "http://");
this.subscribe = new XMLHttpRequest();
this.subscribe.onreadystatechange = this.proxy(this.rsc);
this.subscribe.open("GET", this.address + "/lp/subscribe");
this.subscribe.send();
};
WebSocket.fn.send = function(data){
if (!this.connected) return;
var publish = new XMLHttpRequest();
publish.open("POST", this.address + "/lp/publish");
publish.setRequestHeader("X-Session-ID", this.sessionID);
publish.send(data);
};
WebSocket.fn.onopen = function(){};
WebSocket.fn.onclose = function(){};
WebSocket.fn.onmessage = function(data){};
WebSocket.fn.onerror = function(){};
WebSocket.fn.close = function(){
if ( !this.connected ) return;
this.connected = false;
this.subscribe.abort();
this.onclose();
};
WebSocket.fn.proxy = function(func){
var thisObject = this;
return(function(){
return func.apply(thisObject, arguments);
});
};
window.WebSocket = WebSocket;
window.WebSocketComet = WebSocket;
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment