Skip to content

Instantly share code, notes, and snippets.

@paraboul
Created February 16, 2010 00:14
Show Gist options
  • Save paraboul/305133 to your computer and use it in GitHub Desktop.
Save paraboul/305133 to your computer and use it in GitHub Desktop.
<?php
gc_enable();
class Ape_dispatcher
{
private $_socks = array();
private $_socks_write = array();
private $_timers = array();
public $ticks = 0;
public function __construct()
{
}
private function _process_tick()
{
foreach($this->_timers AS &$timer) {
if (count($timer) && $timer[1]-- == 0) {
$timer[2]();
if ($timer[3]) {
$timer[1] = $timer[0];
} else {
$timer = array();
}
}
}
}
public function add_timer($ms, $func, $perio = 0)
{
$this->_timers[] = array($ms, $ms, $func, $perio);
}
public function connect($onconnect, $onread)
{
$sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_set_option($sock, SOL_SOCKET, SO_REUSEADDR, 1);
socket_set_nonblock($sock);
@socket_connect($sock, '127.0.0.1', 6969);
$this->_socks[(int)$sock] = array(1, $sock, $onconnect, $onread, '');
$this->_socks_write[(int)$sock] = $sock;
return $sock;
}
public function write($socket, $data)
{
if ($this->_socks[(int)$socket][0] == 1) {
$this->_socks[(int)$socket][4] .= $data;
return;
}
$data = $this->_socks[(int)$socket][4] . $data;
$len = strlen($data);
$wlen = socket_write($socket, $data, $len);
if ($wlen != $len) {
$this->_socks[(int)$socket][4] = substr($data, $wlen);
if (!$this->_socks_write[(int)$socket]) {
$this->_socks_write[(int)$socket] = $socket;
}
} else {
$this->_socks[(int)$socket][4] = '';
unset($this->_socks_write[(int)$socket]);
}
}
public function start($func_init, $func_perio)
{
$func_init($this);
$mc = microtime(false);
list($usec_start, $sec_start) = explode(' ', $mc);
$usec_start = substr($usec_start, 2, -2);
$lticks = 0;
while (1) {
$read = array();
$write = array();
foreach($this->_socks AS $s) {
$read[] = $s[1];
}
foreach($this->_socks_write AS $s) {
$write[] = $s;
}
if (!count($read) && !count($write)) {
usleep(1000);
} else {
if (($x = socket_select($read, $write, $except = NULL, 0, 1000) > 0)) {
foreach($write AS $w) {
if ($this->_socks[(int)$w][0] == 1 && !socket_get_option($w, SOL_SOCKET, SO_ERROR)) { /* on_connect */
$this->_socks[(int)$w][0] = 0;
$this->_socks[(int)$w][2]();
}
if ($this->_socks[(int)$w][4] != '') {
$this->write($w, '');
}
}
foreach($read AS $r) {
$data = socket_read($r, 8092, PHP_BINARY_READ);
if (strlen($data) > 0) {
$this->_socks[(int)$r][3]($data);
} else {
unset($this->_socks[(int)$r]);
}
}
}
}
$mc = microtime(false);
list($usec, $sec) = explode(' ', $mc);
$usec = substr($usec, 2, -2);
$uticks = 1000000 * ($sec - $sec_start);
$uticks += ($usec - $usec_start);
$sec_start = $sec;
$usec_start = $usec;
$lticks += $uticks;
while ($lticks >= 1000) {
$lticks -= 1000;
$this->_process_tick();
}
$func_perio($this, ++$this->ticks);
}
}
}
class Ape
{
private $_fd;
private $_opt;
private $_chl;
public $sessid;
public $dispatch;
public $sub = 0;
public function __construct($dispatch, $opt = array())
{
$this->_chl = 0;
$this->sessid = false;
$this->dispatch = $dispatch;
if (count($opt)) {
foreach($opt AS $name => $val) {
$this->setopt($name, $val);
}
}
$this->addEvent('LOGIN', function($obj, $data) {
$obj->sessid = $data->sessid;
});
$this->addEvent('ERR', function($obj, $data) {
echo 'ERREUR' . "\n";
});
}
public function connect($opt = array())
{
$this->cmd('CONNECT', $opt);
return $this;
}
public function setopt($name, $val)
{
$this->_opt[$name] = $val;
}
public function cmd($name, $params = array())
{
$object = array(
'cmd' => $name,
'chl' => ++$this->_chl
);
if ($this->sessid)
$object['sessid'] = $this->sessid;
if (count($params))
$object['params'] = $params;
return $this->_request(array($object));
}
private function _request($obj)
{
$cmd = json_encode($obj);
$data = $read = '';
$self = &$this;
$this->_fd = $this->dispatch->connect(function(){}, function($read) use (&$self) {
$content = explode("\r\n\r\n", $read, 2);
$content = $content[1];
$obj = json_decode($content);
foreach($obj AS $val) {
foreach($self->_events AS $v) {
if ($v[0] == $val->raw)
$v[1]($self, $val->data);
}
}
});
switch($this->_opt['method']) {
case 'GET':
$data .= 'GET /0/?'.$cmd.' HTTP/1.1' . "\r\n";
$data .= 'Host: '.$this->sub.'.ape.com' . "\r\n\r\n";
break;
case 'POST':
default:
$data .= 'POST /0/ HTTP/1.1' . "\r\n";
$data .= 'Host: '.$this->sub.'.ape.com' . "\r\n";
$data .= 'content-length: ' . strlen($cmd) . "\n\n";
$data .= $cmd;
break;
}
$this->dispatch->write($this->_fd, $data);
}
public function addEvent($name, $func)
{
$this->_events[] = array($name, $func);
}
}
class Ape_task
{
private $_client_every = 1;
private $_num = 1;
private $_check = 8;
public function __construct()
{
}
public function setNewClientEvery($ms)
{
$this->_client_every = (int)$ms;
}
public function setNumClients($num)
{
$this->_num = (int)$num;
}
public function setCheckInterval($sec)
{
$this->_check = (int)$sec;
}
public function start()
{
$self = &$this;
$disp = new Ape_dispatcher();
$disp->start(function($disp) use (&$self) {
$disp->add_timer($self->_client_every, function() use (&$disp, &$self) {
$ape = new Ape($disp);
$ape->connect()->addEvent('LOGIN', function($obj, $data) use (&$disp, &$self) {
$disp->add_timer($self->_check*1000, function() use (&$obj) {
$obj->cmd('CHECK');
}, 1);
});
}, 1);
}, function($disp, $ticks) {
pcntl_signal_dispatch();
});
}
}
?>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment