Skip to content

Instantly share code, notes, and snippets.

@ondrejmirtes
Last active August 29, 2015 13:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ondrejmirtes/10895929 to your computer and use it in GitHub Desktop.
Save ondrejmirtes/10895929 to your computer and use it in GitHub Desktop.
<?php
use Guzzle\Http\Url;
use Nette\Diagnostics\Debugger;
use Nette\Utils\Strings;
use React\Http\Request;
use React\Http\Response;
use React\SocketClient\ConnectionManager;
use React\Stream\Stream;
class LongpollingApp
{
/**
* @var \React\SocketClient\ConnectionManager
*/
private $connectionManager;
/**
* Array of streams indexed by id query parameter
* @var \React\Stream\Stream[]
*/
private $streams;
public function setConnectionManager(ConnectionManager $connectionManager)
{
$this->connectionManager = $connectionManager;
$this->streams = array();
}
public function onRequest(Request $request, Response $response)
{
if ($request->getMethod() === 'OPTIONS') {
$response->writeHead(200, $this->getCorsHeaders());
$response->end();
} elseif ($request->getMethod() === 'GET') {
$this->handleGet($request, $response);
} elseif ($request->getMethod() === 'POST') {
$this->handlePost($request, $response);
}
}
private function getCorsHeaders()
{
return array(
'Access-Control-Allow-Origin' => '*',
'Access-Control-Allow-Methods' => 'GET, POST, OPTIONS',
'Access-Control-Allow-Headers' => 'Content-Type',
);
}
private function handleGet(Request $request, Response $response)
{
$url = $this->getUrl($request, $response);
if ($url === NULL) {
$response->end();
return;
}
$host = $url->getHost();
if ($host === 'localhost') {
$host = '127.0.0.1';
}
$port = (int) $url->getPort();
$stream = $this->connectionManager->getConnection($host, $port);
$stream->then(function(Stream $stream) use ($request, $response, $host, $port) {
$this->handleCreatedStream($stream, $request, $response, $host, $port);
}, function($error) use ($response) {
$response->end();
Debugger::log($error);
});
}
/**
* @SuppressWarnings(CS.DisallowedExceptionsInCatch) see comment in catch {} block
*
* @param \React\Http\Request $request
* @param \React\Http\Response $response
* @return NULL
*/
private function handlePost(Request $request, Response $response)
{
$request->on('data', function($data) use ($request, $response) {
if (!$data) {
return;
}
$id = $this->getId($request);
if (isset($this->streams[$id])) {
$stream = $this->streams[$id];
$this->sendMessage($stream, $data);
}
try {
$response->writeHead(200, $this->getCorsHeaders());
} catch (\Exception $e) {
// sometimes the head is already written - have no idea why
}
$response->end();
});
}
private function getId(Request $request)
{
return $request->getQuery()['id'];
}
/**
* @param \React\Http\Request $request
* @param \React\Http\Response $response
* @return \Guzzle\Http\Url
*/
private function getUrl(Request $request, Response $response)
{
$query = $request->getQuery();
if (!isset($query['url']) || !isset($query['id'])) {
$response->end();
return NULL;
}
return Url::factory($query['url']);
}
private function trim($data)
{
return trim($data, "\xff\0");
}
private function handleCreatedStream(Stream $stream, Request $request, Response $response, $host, $port)
{
$stream->on('data', function($data, Stream $stream) {
if (Strings::startsWith($data, 'HTTP/1.1 101 WebSocket Protocol Handshake')) {
$stream->emit('handshake');
} else {
$data = $this->trim($data);
$messageParts = explode("\xff\x00", $data);
foreach ($messageParts as $message) {
$stream->emit('websocketMessage', array($message));
}
}
});
$stream->on('close', function() use ($response) {
$response->end();
});
$stream->on('error', function($error) use ($response) {
$response->end();
Debugger::log(new \LongpollingAppException($error));
});
$stream->on('handshake', function() use ($response) {
$response->writeHead(200, $this->getCorsHeaders() + array('Content-Type' => 'application/octet-stream'));
$response->write('connectionBegin');
$response->write(str_repeat('-', 2048));
});
$stream->on('websocketMessage', function($message) use ($response) {
$response->write($message . "\n");
});
$this->handshakeHixie76($stream, $host, $port);
$this->streams[$this->getId($request)] = $stream;
$request->on('end', function() use ($stream, $request) {
$stream->close();
unset($this->streams[$this->getId($request)]);
});
}
private function sendMessage(Stream $stream, $message)
{
$stream->write("\x00" . $this->trim($message) . "\xff");
}
private function handshakeHixie76(Stream $stream, $host, $port)
{
$key1 = $this->generateRandomString(32);
$key2 = $this->generateRandomString(32);
$key3 = $this->generateRandomString(8, FALSE, TRUE);
$header = "GET / HTTP/1.1\r\n";
$header .= "Upgrade: WebSocket\r\n";
$header .= "Connection: Upgrade\r\n";
$header .= 'Host: ' . $host . ':' . $port . "\r\n";
$header .= "Origin: http://foobar.com\r\n";
$header .= 'Sec-WebSocket-Key1: ' . $key1 . "\r\n";
$header .= 'Sec-WebSocket-Key2: ' . $key2 . "\r\n";
$header .= "\r\n";
$header .= $key3;
$stream->write($header);
}
private function generateRandomString($length = 10, $addSpaces = TRUE, $addNumbers = TRUE)
{
$characters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"§$%&/()=[]{}';
$useChars = array();
// select some random chars:
for ($i = 0; $i < $length; $i++) {
$useChars[] = $characters[mt_rand(0, strlen($characters) - 1)];
}
// add spaces and numbers:
if ($addSpaces) {
array_push($useChars, ' ', ' ', ' ', ' ', ' ', ' ');
}
if ($addNumbers) {
array_push($useChars, rand(0, 9), rand(0, 9), rand(0, 9));
}
shuffle($useChars);
$randomString = trim(implode('', $useChars));
$randomString = substr($randomString, 0, $length);
return $randomString;
}
}
(function() {
if (window.WebSocket) {
return;
} else if (window.MozWebSocket) {
window.WebSocket = MozWebSocket;
return;
} else if (typeof XMLHttpRequest === "undefined") {
return;
}
var ie = (function(){
var undef,
v = 3,
div = document.createElement('div'),
all = div.getElementsByTagName('i');
while (
div.innerHTML = '<!--[if gt IE ' + (++v) + ']><i></i><![endif]-->',
all[0]
);
return v > 4 ? v : undef;
}());
if (ie && ie < 8) {
return;
}
window.WebSocket = WebSocket = function(url) {
var id = Math.random().toString(36).slice(2);
this.url = url + '&id=' + id;
this.xhr = WebSocket.createRequest();
this.message = '';
if (WebSocket.isIe()) {
var bindedAbort = this.abort.bind(this);
this.xhr.onerror = bindedAbort;
this.xhr.ontimeout = bindedAbort;
this.xhr.onload = bindedAbort;
var self = this;
setInterval(function() {
self.update();
}, 500);
} else {
this.xhr.onreadystatechange = this.onStateChange.bind(this);
}
this.lastResponseText = '';
};
WebSocket.isIe = function() {
return ie < 10 && typeof XDomainRequest !== "undefined";
};
// Max. size of buffer for incoming messages @see Websocket.message
WebSocket.MAX_BUFFER_SIZE = 1024 * 20;
WebSocket.createRequest = function () {
if (WebSocket.isIe()) {
return new XDomainRequest();
}
return new XMLHttpRequest();
};
WebSocket.prototype.onIeError = function() {
this.abort();
};
WebSocket.prototype.update = function() {
var data = this.xhr.responseText.substring(this.lastResponseText.length);
if (!data) {
return;
}
if (data.substring(0, 'connectionBegin'.length) === 'connectionBegin') {
this.onopen();
} else {
// WebSocket message
var dataParts = data.split("\n");
for (var i in dataParts) {
if (!dataParts[i]) {
continue;
}
this.message += dataParts[i];
if (this.isValidJSON(this.message)) {
this.onmessage({'data': this.message});
this.message = '';
} else if (this.message.lenght > WebSocket.MAX_BUFFER_SIZE) {
this.message = '';
}
}
}
this.lastResponseText = this.xhr.responseText;
};
WebSocket.prototype.isValidJSON = function (string) {
var result = true;
try {
JSON.parse(string);
} catch (e) {
result = false;
}
return result;
}
WebSocket.prototype.abort = function() {
this.onclose();
};
WebSocket.prototype.start = function() {
this.xhr.open('GET', this.url, true);
this.xhr.send();
};
WebSocket.prototype.onStateChange = function() {
if (this.xhr.readyState === 3) {
this.update();
} else if (this.xhr.readyState === 4) {
this.abort();
}
};
WebSocket.prototype.send = function(data) {
var xhr = WebSocket.createRequest();
if (WebSocket.isIe()) {
xhr.open('post', this.url);
} else {
xhr.open('POST', this.url, true);
}
xhr.send(data);
};
WebSocket.prototype.close = function() {
this.xhr.abort();
};
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment