Skip to content

Instantly share code, notes, and snippets.

@J7mbo
Created October 4, 2013 09:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save J7mbo/6823472 to your computer and use it in GitHub Desktop.
Save J7mbo/6823472 to your computer and use it in GitHub Desktop.
/**
* User subscribes with their subject:userId:token
*
* If token does not match userId (check DB), return a 403 else...
*
* Token is regenerated and placed in DB for next time the page is visited,
* userId is retrieved that matches token.
*
* New process spawned which runs looping zmq with argv being userId from
* previous DB check. Process Id retrieved.
*
* Add conn, subject, userId, processId, ip and new token to $clients array
*
* Print above details to console for debugging (or log all this)
*
* @param \Ratchet\ConnectionInterface $conn
* @param type $topic
*/
public function onSubscribe(ConnectionInterface $conn, $subscription)
{
list($subject, $userId, $token) = explode(':', $subscription->getId());
$subscription->add($conn);
$resourceId = $conn->resourceId;
$pidArr = [];
$cmd = sprintf('%s%s --userId=%d --subject=%s --resourceId=%s', self::CMD_PREFIX, self::CMD_TORRENTS, $userId, $subject, $resourceId);
exec(sprintf("%s > /dev/null 2>&1 & echo $!;", $cmd), $pidArr);
$pid = $pidArr[0];
if (empty($pidArr) || $pid == 0)
{
echo sprintf("Couldn't create a new process for user: %d", $userId);
return;
}
if (!array_key_exists($resourceId, $this->clients))
{
$this->clients[$resourceId]['userId'] = $userId;
$this->clients[$resourceId]['token'] = $token;
$this->clients[$resourceId]['conn'] = $conn;
$this->clients[$resourceId]['subject'] = $subject;
$this->clients[$resourceId]['subscription'] = $subscription;
$this->clients[$resourceId]['pid'] = $pid;
}
echo sprintf("Client Connected: %s, userId: %d, subject: %s" . PHP_EOL, $conn->remoteAddress, $userId, $subject);
}
/**
* @param string JSON'ified string we'll receive from ZeroMQ
*/
public function onMessage($data)
{
$data = json_decode($data, true);
var_dump($data);
$result = reset(array_filter($this->clients, function($client) use ($data) {
return $client['conn']->resourceId == $data['resourceId'];
}));
if ($result)
{
$result['subscription']->broadcast($data);
}
else
{
echo "Couldn't find user in this->clients";
return;
}
}
public function onClose(ConnectionInterface $conn)
{
$resourceId = $conn->resourceId;
$result = reset(array_filter($this->clients, function($client) use ($resourceId) {
return $client['conn']->resourceId == $resourceId;
}));
posix_kill($result['pid'], SIGKILL);
unset($this->clients[$conn->resourceId]);
echo sprintf("Client Disconnected: %s" . PHP_EOL, $conn->remoteAddress);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment