Skip to content

Instantly share code, notes, and snippets.

@aikar
Created September 30, 2011 17:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aikar/1254407 to your computer and use it in GitHub Desktop.
Save aikar/1254407 to your computer and use it in GitHub Desktop.
<?php
function echof()
{
call_user_func_array('AppCommUtility::echof', func_get_args());
}
/**
* *************************************************************************
* PHP5 Application Communication System
* *************************************
* by Daniel Ennis <daniel@danielennis.com>
*
* //** Contact **\\
* email: daniel@danielennis.com
* irc: Aikar@irc.ffochat.com, Aikar@irc.freenode.net, Aikar@irc.gamesurge.net
*
* *************************************************************************
* //** Requirements **\\
* OS :> Linux/Unix or Windows*
* PHP:> PHP5 with CLI access (SSH)
* * :> php_sockets.dll is required for Windows based systems to replace usleep();
*
* :> No external extensions are required unless you are on a Windows Server.
*
* *************************************************************************
* //** Description **\\
* This class library is intended to provide a "Server" like design to your
* application.
*
* Working together these classes can provide a "psuedo threading" application
* design, allowing you to distribute tasks over multiple processes, and
* letting them communicate
*
* Back to the original application when finished. This can really expediate processing tasks
* That use up alot of CPU time waiting on responses for things such as Socket based communicates
* And HTTP queries.
*
* AppCommServer class library is intended to be extended by your own class, and you define the
* Abilities it can perform.
*
* By default, you can initially create a new instance of your app and call the Start()
* method to start the main application Loop.
*
* Start may be called in your class constructor so it does not need to be called on creation.
*
* On start, first a ServerInit() method is called. You may overload this and provide any initiation functions here.
*
* Then, the main application loop starts. You will then define a function called EntryPoint(). This
* function is executed every loop your application makes. The interval for loops is default 1 second,
* and may be changed in your Start() method (@see AppCommServer::Start)
*
* Then comes Commands. Anytime a command is issued to the application, a function prefixed with
* Command_ is checked for existence, and ran with your issued command. For example, If you typed
* help, AppCommServer will try to execute a function named Command_help($var);
*
* All Command overloads have 1 variable, which may be a string or an array.
*
* If you are typing commands directly into the Console window, the parameter is usually a string.
* However, if you format it as a PHP array (however, strictly as all key value pairs MUST be enclosed
* in double quotes, eg: array("a" => "b")), it will be sent to your function as an array.
*
* If you wish to change the loop interval, you may call setTimer($time) function. The time is in
* milliseconds, so please set the time properly.
*
* AppCommClient is another class intended to be used with AppCommServer.
* From now on AppCommServer will be referred to as ACS and AppCommClient will be referred
* to as ACC. ACC Provides the ability to spawn another PHP Process, and communicate with it.
* But ACC was designed in that the spawned php file happens to be another ACS extended application.
*
* Just as you are able to issue commands to the console, ACS gives you the ability to issue commands
* to its process. The major difference is all communications between an ACC instance to an ACS
* application are encoded. This means you can safely send arrays and binary data to the children.
* Please note that this is external processes. RESOURCE HANDLES WILL NOT TRANSFER! You can not
* pass an open handle to a socket or a MySQL DB Connection and expect it to 'share'.
*
* However, if the class you are passing supports the magic methods __sleep and __wakeup, it may
* have enough information to transfer itself over the pipes, but it will not be a 'reference',
* it will be a new copy, ie db connection.
*
* ACC and ACS do this by first serializing all data, the base64 encoding it to ensure binary safety,
* and sending the data over stdin input pipe. Then the data is read over stdread, and decoded into arrays.
* All the array data is in the form of cmd:data pairs like array('cmd' => 'mycommand', 'data' => 'mydata');
*
* There are some built in commands that are handled internally, but these are double underscore prefixed so not
* likely to conflict with what you are doing.
*
* While your entry point is ran on a specified interval, reading the data pipes for new commands is still
* performed while the main entry point is sleeping so your commands will execute fast and not be constrained
* by your EntryPoint's timer.
*
* The ACS and ACC classes provide many useful utilities to utlizing its features, please read this files
* full documentation.
*
* *************************************************************************
* //** License **\\
* > This program is free software; you can redistribute it and/or
* > modify it under the terms of the GNU General Public License
* > as published by the Free Software Foundation; either version 2
* > of the License, or (at your option) any later version.
* >
* > This program is distributed in the hope that it will be useful,
* > but WITHOUT ANY WARRANTY; without even the implied warranty of
* > MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* > GNU General Public License for more details.
* >
* > You should have received a copy of the GNU General Public License
* > along with this program; if not, write to the Free Software
* > Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
* *************************************************************************
*/
set_time_limit(0);
define('ENV___APPCOMM_NOTCONTROLLER', getenv('__APPCOMM_NOTCONTROLLER'));
define('ENV___APPCOMM_SERVERNAME', getenv('__APPCOMM_SERVERNAME'));
error_reporting(E_ALL & ~E_NOTICE);
ini_set('display_errors', 'Off');
ini_set('log_errors', 'On');
ini_set('error_log', dirname(dirname(__FILE__)).'/error_log');
set_error_handler(function($errno, $errmsg)
{
try{throw new Exception();} catch(Exception $e)
{
$err = "Error ($errno): $errmsg\n\n" . $e->getTraceAsString();
trigger_error($err, E_USER_WARNING);
fwrite(STDERR, $err, strlen($err));
}
}, E_WARNING);
class AppCommReadBuffer
{
public $readBuffer = '', $stream = null;
function __construct($stream, $readCallback, $args = array())
{
$this->stream = $stream;
AppCommEvents::registerReadStream($stream, $readCallback, $this, $args);
}
public function read()
{
$data = '';
while($d = fread($this->stream, 8192))
{
$data .= $d;
}
//$data = stream_get_contents($this->stream);
return $data;
}
public function &readToBuffer()
{
$this->readBuffer .= $this->read();
return $this->readBuffer;
}
}
class AppCommWriteBuffer
{
public $writeBuffer = array(), $stream = null;
function __construct($stream)
{
$this->stream = $stream;
AppCommUtility::registerWriteStream($stream, $this);
}
public function sendData($data)
{
//if(!strstr($data, "<appcomm")) echo "<$data>";
if(false && $this->stream == STDOUT)
{
echo $data;
}else{
$this->writeBuffer[] = $data;
AppCommEvents::$writeQueue[(int) $this->stream] = $this->stream;
}
}
}
class AppCommReadBase
{
protected $commandCallbacks = array(), $unknownHandler = null;
/**
* Registers a callback function. Specify a specific function to execute when a client issues a command.
* for more organized code.
* @param object $cmd What command to act upon.
* @param object $function What function to all, can use array($class,$method) format.
* @return
*/
public function RegisterCommand($cmd, $function)
{
$this->commandCallbacks[$cmd] = $function;
}
public function setUnknownHandler($handler)
{
$this->unknownHandler = $handler;
}
public function parseCommands(&$data)
{
return AppCommUtility::ParseCommands($data);
}
public function onRecv($buffer, $data)
{
return $data;
}
public function onFeof($stream)
{
AppCommEvents::unregisterReadStream($stream);
}
public function _onData($stream, AppCommReadBuffer $buffer)
{
if (!feof($stream))
{
$data = $buffer->read();
//$data = stream_get_contents($stream);
$this->onStreamData($buffer, $data);
}else{
$this->onFeof($stream);
}
}
public function onStreamData($buffer, $data)
{
$data = $this->onRecv($buffer, $data);
$buffer->readBuffer .= $data;
//Make sure we got some data.
if (strlen($buffer->readBuffer))
{
//Decode the incoming data and process it.
$cmds = $this->parseCommands($buffer->readBuffer);
foreach ($cmds as $r)
{
//extract the command into $cmd/$data vars
if (is_array($r))
{
$cmd = $r['cmd'];
$data = $r['data'];
} else
{
//If $r is a string, $r is the command name. default null data
$data = NULL;
}
//build an object to return in the results that includes the socket that issued this.
$object = array(
'__socket' => $buffer, 'cmd' => $cmd, 'data' => $data
);
//If a command callback exists for this command, call it.
if (array_key_exists($cmd, $this->commandCallbacks))
{
$return = call_user_func($this->commandCallbacks[$cmd], $buffer, $data);
/*if ($r['data']['__return'])
{//TODO: Get results from command, trans id
//If __return was set, its EXPECTING a response from the called command, so we need
// to send it back along with the transaction ID so it can use the data.
$this->Send($buffer, 'return', array(
'__transid' => $r['data']['__transid'], 'data' => $return
));
}*/
} else if (!is_null($this->unknownHandler))
{
//else, return it unhandled.
call_user_func($this->unknownHandler, $buffer, $cmd, $data);
}
}
}
}
}
class AppCommEvents
{
static public
$readStreams = array(),
$readStreamsFunctions = array(),
$writeQueue = array(),
$writeStreams = array(),
$tickFunctions = array(),
$tickIntervalU = null,
$tickInterval = null,
$eventLoopRunning = true,
$hasRegisteredTick = false;
static public function
setTimeout($function, $interval, $args = array())
{
self::$tickIntervalU = 1000;
self::$tickInterval = 0;
if(!self::$hasRegisteredTick)
{
self::$hasRegisteredTick = true;
register_tick_function(array('AppCommEvents', 'runTickFunctions'));
}
array_push(self::$tickFunctions, array(
'function' => $function,
'interval' => $interval,
'args' => $args,
'runOnce' => true,
'next' => (microtime(true) * 1000) + $interval
));
}
static public function
setInterval($function, $interval, $args = array())
{
self::registerTickFunction($interval, $function, $args);
}
static public function
registerTickFunction($interval, $function, $args = array())
{
self::$tickIntervalU = 1000;
self::$tickInterval = 0;
if(!self::$hasRegisteredTick)
{
self::$hasRegisteredTick = true;
register_tick_function(array('AppCommEvents', 'runTickFunctions'));
}
array_push(self::$tickFunctions, array(
'function' => $function,
'interval' => $interval,
'args' => $args,
'runOnce' => false,
'next' => (microtime(true) * 1000) - $interval
));
}
static public function
registerReadStream($stream, $function,
AppCommReadBuffer $readBuffer = null, $args = array())
{
if (is_resource($stream))
{
$index = (int) $stream;
self::$readStreams[$index] = $stream;
self::$readStreamsFunctions[$index] = array(
'function' => $function,
'readBuffer' => $readBuffer,
'args' => $args
);
}
}
static public function
registerWriteStream($stream, $function,
AppCommWriteBuffer $writeBuffer, $args = array())
{
if (is_resource($stream))
{
$index = (int) $stream;
self::$writeStreams[$index] = array(
'stream' => $stream,
'function' => $function,
'writeBuffer' => $writeBuffer,
'args' => $args
);
}
}
static public function
unregisterReadStream($stream)
{
if(is_resource($stream))
{
$index = (int) $stream;
unset(self::$readStreams[$index], self::$readStreamsFunctions[$index]);
}
}
static public function
unregisterWriteStream($stream)
{
if(is_resource($stream))
{
$index = (int) $stream;
unset(self::$writeStreams[$index]);
}
}
static public function stopEventLoop()
{
self::$eventLoopRunning = false;
}
static public function runEventLoop()
{
self::$eventLoopRunning = true;
declare(ticks=1)
{
while (self::$eventLoopRunning)
{
$_readStreams = self::$readStreams;
self::$writeQueue = array();
foreach (self::$writeStreams as $id => $stream)
{
if (count($stream['writeBuffer']->writeBuffer))
{
self::$writeQueue[$id] = $stream['stream'];
}
}
if (count($_readStreams) || count(self::$writeQueue))
{
do{
$readStreams = $_readStreams;
$writeStreams = self::$writeQueue;
stream_select( $readStreams,
$writeStreams,
$e = null, self::$tickInterval,
self::$tickIntervalU
);
} while(!count($readStreams) && !count($writeStreams) &&
(count($_readStreams) || count(self::$writeQueue))
);
if (count($readStreams))
{
foreach ($readStreams as $stream)
{
$index = (int) $stream;
if($index)
{
$function = self::$readStreamsFunctions[$index]['function'];
$args = self::$readStreamsFunctions[$index]['args'];
$readBuffer = self::$readStreamsFunctions[$index]['readBuffer'];
array_unshift($args, $readBuffer);
array_unshift($args, $stream);
call_user_func_array($function, $args);
//if(is_resource($stream) && feof($stream)) self::unregisterReadStream ($stream);
}
}
}
if (count($writeStreams))
{
foreach ($writeStreams as $stream)
{
$index = (int) $stream;
if($index)
{
$function = self::$writeStreams[$index]['function'];
$args = self::$writeStreams[$index]['args'];
$writeBuffer = self::$writeStreams[$index]['writeBuffer'];
if(is_null($args))
{
//echof('ISNULL ' . $index . print_r(self::$writeStreams,true));
continue;
}
if (!ENV___APPCOMM_NOTCONTROLLER)
{
//echo "====== idx: $index ======\n";
//echo print_r($writeBuffer,true) ;
}
array_unshift($args, $writeBuffer);
array_unshift($args, $stream);
call_user_func_array($function, $args);
//if(is_resource($stream) && feof($stream)) self::unregisterWriteStream ($stream);
}
}
}else{
/* ob_start();
var_dump(self::$writeStreams[2]['writeBuffer']->writeBuffer);
$data = ob_get_clean();
echof("===== no write =====\n$data");*/
}
}else{
AppCommUtility::usleep(1000);
}
}
}
}
static public function runTickFunctions()
{
$mstime = (microtime(true) * 1000);
foreach (self::$tickFunctions as $key => &$func)
{
// TODO: need to do some kind of last+interval so if the app
// gets lagged behind it'll 'catch up'.
if ($func['next'] <= $mstime)
{
$func['next'] += $func['interval'];
call_user_func_array($func['function'], $func['args']);
if($func['runOnce'])
{
unset(self::$tickFunctions[$key]);
}
}
}
}
}
class AppCommClient extends AppCommReadBase
{
// The process Handle for child spawn setup.
private $Process = NULL;
// Handles to communication pipes (stdout, stdin, stderr)
private $outgoingDataStream = NULL;
private $incomingDataStream = NULL;
private $incomingErrorStream = NULL;
//Buffer for received commands.
private $recvBuffer = '';
// The name of the client, can be specified on creation to be descriptive.
// Defaults to app name
public $ClientName = '';
// Holds queued commands incase multiple came in on 1 data packet,
// or incase a call needs to check the buffer, or waiting for a specific
// reply to handle. This provides the ability to get the commands parsed
// in the order they were received to its best effort.
public $QueueStack = array();
// This libraries version #
const VERSION = 1.2;
/**
* Spawns a new process and stores the handles to its pipes/process.
* @param string $app The applications filename to start
* @param string $name [optional] An optional name to give to the
* client/remote app.
* @param array $startupcommands [optional] An array of cmd:data pairs to
* send on start
* @param array $env [optional] Enviroment variables to initiate on remote
* process.
* @return
*/
function __construct($app, $name = NULL, $startupcommands = array(),
$env = array())
{
// Make sure an application is defined...
if ($app == NULL)
die('Invalid Client Construction, no application passed!');
//Was a custom name defined? if not use the app name as the client name.
$this->ClientName = (strlen($name) > 0 ? $name : $app);
//Make sure the app file exists...
if (file_exists($app))
{
//Check if its executable to execute it by the system instead of through the php command.
if (is_executable($app))
{
//If this is executeable, did the parameter already specify a path to execute the binary as?
// ie: './app' vs plain 'app'. executing it w/o php prefix requires a full/relative path.
// prepend ./ if needed.
$fullapp = (substr($app, 0, 2) != './' && substr($app, 0, 3) != '../' && $app[0] != '/'
? './' : '') . $app;
} else
{
//The file is not executable, so execute it with the php command.
if (getenv('XDEBUG_CONFIG'))
{
$fullapp = 'php -c php-debug.ini ' . $app;
} else
{
if(strpos($app, ' '))
{
$fullapp = $app;
}else{
$fullapp = 'php ' . $app;
}
}
}
$Pipes = array(); // Temporary holder for Open Handles.
//Custom enviroment variables.
//We will pass the custom name specified to the child, so it can know to use that name too if supplied.
//We also want to pass a notcontroller that tells the process its a child of another, so it can know
//to encode all outgoing data.
// merge in the _ENV array with custom supplied ENV vars.
$env = array(
'__APPCOMM_SERVERNAME' => $name, '__APPCOMM_NOTCONTROLLER' => 1
);
if (getenv('XDEBUG_CONFIG'))
$env['XDEBUG_CONFIG'] = getenv('XDEBUG_CONFIG');
//Open the process.
$this->Process = proc_open($fullapp, array(
//What pipes we will be needing to use (open handles to use them in $Pipes)
0 => array(
"pipe", "r"
), 1 => array(
"pipe", "w"
), 2 => array(
"pipe", "w"
)
//The pipes array to store the handles.
), $Pipes, null,
//Our custom enviroment variables.
$env);
//Make sure the process started
if (isset($this->Process) && $this->Process)
{
//Assign the pipes to readable names.
$this->outgoingDataStream = $Pipes[0];
$this->incomingDataStream = $Pipes[1];
$this->incomingErrorStream = $Pipes[2];
//Turn off blocking (so that reading pipes will not 'block' the current thread waiting for data to be ready)
// With this off, the read command will return instantly if theres no data and just do nothing until next check.
stream_set_blocking($this->outgoingDataStream, false);
stream_set_blocking($this->incomingDataStream, false);
stream_set_blocking($this->incomingErrorStream, false);
$writeBuffer = new AppCommWriteBuffer($this->outgoingDataStream);
$readBuffer = new AppCommReadBuffer($this->incomingDataStream, array($this, '_onData'));
$errorBuffer = new AppCommReadBuffer($this->incomingErrorStream, array($this, '_onError'));
$this->writeBuffer = $writeBuffer;
$this->readBuffer = $readBuffer;
$this->errorBuffer = $errorBuffer;
$this->RegisterCommand('__log', array($this, '__handleLog'));
//Sleep some to give the remote process a little time to fully open.
AppCommUtility::usleep(20000);
//$status = proc_get_status($this->Process);
//Was any initial commands supplied? Issue them now.
foreach ($startupcommands as $key => $value)
$this->SendCommand($key, $value);
} else
{
//Something occured and the application could not start.
trigger_error('Could not start App: ' . $app);
}
} else
{
//Application file does not exists.
trigger_error('App does not exists: ' . $app);
}
}
function __handleLog($buffer, $data)
{
AppCommUtility::HandleLog($data);
}
function _onError($stream, $buffer)
{
if(!feof($stream))
{
echo '[ERROR: ' . $this->ClientName . '] <' . $buffer->read() . ">\n======\n";
}else{
echo '[FEOF]' . $this->ClientName;
$this->Close();
}
}
/**
* Destroys the remote process.
* @return
*/
function __destruct()
{
//Close the process if its open when the client is being destroyed.
$this->Close();
}
//
/**
* $client = new ACC(), if $client is used in a sring fashion like echo $client, return client name.
* @return string
*/
function __toString()
{
return $this->ClientName;
}
//
/**
* send a quit command and close the process.
* @param string $reason [optional] A reason for shutting down the process.
* @return
*/
public function Close($reason = null)
{
if ($this->isActive()
)$this->SendCommand('quit', $reason);
if ($this->outgoingDataStream)
fflush($this->outgoingDataStream);
if ($this->incomingDataStream)
stream_get_contents($this->incomingDataStream);
if ($this->incomingErrorStream)
stream_get_contents($this->incomingErrorStream);
AppCommEvents::unregisterReadStream($this->incomingErrorStream);
AppCommEvents::unregisterReadStream($this->incomingDataStream);
AppCommEvents::unregisterWriteStream($this->outgoingDataStream);
fclose($this->outgoingDataStream);
fclose($this->incomingDataStream);
fclose($this->incomingErrorStream);
if ($this->Process)
proc_close($this->Process);
unset($this->Process);
}
//
/**
* Check if the process is still alive (not crashed out)
* @return bool
*/
public function isActive()
{
if (!($s = @stream_get_meta_data($this->outgoingDataStream)))
return false;
return!($s['eof'] || $s['timed_out']);
}
//
/**
* Sends a ping command to the remote process and waits for a pong reply to make sure it is responsive.
* @return bool Whether the process is responding or not.
*/
public function isResponding()
{
return true;
// TODO: Fix this
$pong = time();
$response = $this->GetResponseFromCommand('__ping', $pong, array(
'pong', '__pong'
), 1);
return ($response['data'] == $pong ? true : false);
}
//
/**
* Sends data to the remote process, encoded.
* @param object $cmd The command to execute on the remote process.
* @param object $args [optional] Data to pass to the remotely called function.
* @return bool
*/
public function SendCommand($cmd, $args = NULL)
{
if (!$this->isActive())
return false;
//Send the Data
AppCommUtility::SendData($cmd, $args, $this->writeBuffer);
return true;
}
/**
* An alias to SendCommand.
* @see AppCommClient::SendCommand
* @param object $cmd The command to execute on the remote application.
* @param object $args [optional] Data to pass to the remotely called function.
* @return bool
*/
public function SendData($cmd, $args = NULL)
{
return $this->SendCommand($cmd, $args);
}
/**
* Sends a command, and then specify a specific command the remote process should be replying with,
* so you can call function remotely and return the results it returned. By default use *
* reply, so the first response will be whats returned.
* @param object $cmd Command to issue on remote Process.
* @param object $args Arguments to pass to the remote function call.
* @param object $waitforcommand [optional] Command to wait for in response to issued.
* @param object $wait [optional] Timeout incase remote process does not respond (hanged), in seconds.
* @return The data that was returned on the remote process.
*/
public function GetResponseFromCommand($cmd, $args,
$waitforcommand = array(
'*'
), $wait = 5)
{
throw new Exception("Not Yet Implemented");
/*
//Send the command to the remote process.
$this->SendCommand($cmd, $args);
$response = NULL;
$time = time();
//Wait for response
while (true)
{
//Pass true to get response to skip the queue. Otherwise well be stuck in a loop.
$response = $this->GetResponse(true);
//Process the response
if (is_array($response))
{
//Is it our expected response?
if (in_array($response['cmd'], $waitforcommand) || in_array('*', $waitforcommand))
{
//we got a valid response, break the while loop
break;
}
//we got a response, but it wasnt what we wanted, throw it on the queue
//so the next GetResponse will pick it up outside of this function.
$this->QueueStack[] = $response;
} else
{
//Getting no data returned.. Do a check to make sure we dont wait too long here.
if (time() - $time > $wait)
{
return array(
'cmd' => 'error', 'data' => 'Timeout while waiting for response.'
);
}
}
//Let our CPU breathe 10 ms
AppCommUtility::usleep(50000);
}
return $response;*/
}
/**
* Checks the stdout buffer of the remote process to see if it sent any data back to the parent.
* If it did, process it.
* @param object $skipqueue [optional] Should we skip the internal queue stack, used for GetResponseByCommand.
* @param object $addtoqueue [optional] Don't return the result, always add to Queue.
* @return array the response, or first off the queue.
*/
public function GetResponse($skipqueue = false, $addtoqueue = false)
{
/*
if (!$this->isActive())
return NULL;
// If were not told to skip queue stack, and were not forcing an add to queue stack, and
// we have items in the queue stack, return the first queued command.
if (!$skipqueue && !$addtoqueue && count($this->QueueStack) > 0)
{
//return first queued command
return array_shift($this->QueueStack);
}
//Read the buffer for incoming data.
$this->recvBuffer .= stream_get_contents($this->incomingDataStream);
if (strlen($this->recvBuffer) > 0)
{
//We got some data, Decode it to an array of commands(more arrays in cmd:data pairs)
$commandResults = AppCommUtility::ParseCommands($this->recvBuffer);
$returnCommand = NULL;
if (is_array($commandResults) && count($commandResults))
{
//Get the first result from GetCommands.
$returnCommand = array_shift($commandResults);
if (is_array($returnCommand))
{
//Is this a log message?
if ($returnCommand['cmd'] == 'log')
{
//Log messages are handled internally and echo'd to screen.
AppCommUtility::HandleLog($returnCommand['data']);
//So cancel this command and return nothing.
$returnCommand = NULL;
}
//If we still have returnCommand (not log), and force add to queue,
// then add it to the queue and null returnCommand
if ($returnCommand && $addtoqueue)
{
$this->QueueStack[] = $returnCommand;
$returnCommand = NULL;
}
}
//Iterate the remaining commandResults if we received multiple in 1 data stream
foreach ($commandResults as $cmd)
{
//If it was a log, just handle the log
if ($cmd['cmd'] == 'log')
{
AppCommUtility::HandleLog($cmd['data']);
} else
{
//Else add it to the queue if not log.
$this->QueueStack[] = $cmd;
}
}
}
return $returnCommand;
}
return NULL;
*/
}
/**
* Simply checks stderr to see if any errors occured.
* @return string errors data if any, else null.
*/
public function GetError()
{
if (!$this->isActive() || !$this->incomingErrorStream)
return NULL;
//gets an error buffer by reading stderr and returning any errors.
$error = stream_get_contents($this->incomingErrorStream);
if (!empty($error))
{
return $error;
} else
{
return NULL;
}
}
}
/**
* Provides a framework for designing a server like application. Does underlying interal processing
* to read and parse incoming data and execute Command functions in response, and executing a main
* entry point loop at a specified interval.
*
* @package AppCommServer
*/
abstract class AppCommServer
{
//Buffer to hold received commands.
private $recvBuffer = '';
//stdin handle. To read incoming commands.
private $pInput = NULL;
//boolean specifying if the main process loop should continue. false would stop the app and shutdown.
private $active = false;
//The applications name.
protected $AppName = null;
//The timer between entrypoint intervals.
public $timer = 1000;
//This libraries version/
const VERSION = 1.2;
/**
* Starts the main loop at specified timer.
* @param string $name [optional] Name of the application. If NULL and spawned by ACC,
* it uses the ACC Client name. Else itll use the class name.
* @param int $timer [optional] Millisecond interval to EntryPoint.
* @return
*/
public function __construct()
{
$this->Start();
}
public function Start($name = null, $timer = 1000)
{
//Was the app name supplied
if (is_null($this->AppName))
{
//If not, see if this is a child and use the ACC client name.
if (ENV___APPCOMM_SERVERNAME)
{
$this->AppName = ENV___APPCOMM_SERVERNAME;
} else
{
//this is a root parent, use the class name.
$this->AppName = 'AppComm v' . self::VERSION . ' - ' . get_class($this);
}
} else
{
//name was supplied, use that.
$this->AppName = $name;
}
//set the timer interval for EntryPoint.
$this->timer = $timer;
//Open a handle to stdin
//$this->pInput = fopen("php://stdin", "r");
//set blocking to false so it wont hang the thread waiting for new command data
//return blank if no data instead.
//stream_set_blocking($this->pInput, false);
stream_set_blocking(STDIN, false);
$writeBuffer = AppCommUtility::$stdout;
$readBuffer = new AppCommReadBuffer(STDIN, array($this, '_onData'));
$this->writeBuffer = $writeBuffer;
$this->readBuffer = $readBuffer;
if(method_exists($this, 'EntryPoint'))
{
//Call the ServerInit for user defined functions.
AppCommEvents::registerTickFunction($timer, array($this,'EntryPoint'));
}
$this->ServerInit();
//Start the main loop.
AppCommEvents::runEventLoop();
//The loop ended, exit process.
$this->Command_quit('Server Application Ending');
}
/**
* Changes the interval delay between EntryPoint execution.
* @param int $timer Time in milliseconds between execution.
* @return
*/
public function setTimer($timer)
{
$this->timer = $timer;
}
/**
* Semi abstract method. Ran before server starts.
* We don't want to define this as abstract as the app
* may not need to use ServerInit, therefor should not have
* to define it, and default to this empty function.
* @return
*/
protected function ServerInit()
{
}
//
/**
* This is a special sleep function. It performs just like the C++ version,
* as it uses milliseconds instead of secs or usecs. However, this actually
* relies on tiny usleep's so we can check for new commands in the middle of a sleep!
*
* Otherwise a pure Sleep(interval), would make it so issued commands only fired every
* (interval) timeframe. So a 'slow' interval would be very slow to respond to commands.
* @param integer $ms Time in milliseconds to sleep.
* @return
*/
final public function _onData($stream, AppCommReadBuffer $buffer)
{
$buffer->readToBuffer();
if (strlen($buffer->readBuffer) > 0)
{
//Parse the data. Since this is stdin, the data may or may not be encoded
//Also, we may have an impartial encoded command.
//We let ACU::ParseCommands/ParseCommand worry about that and handle accordingly.
$commands = AppCommUtility::ParseCommands($buffer->readBuffer);
//ParseCommands will modify recvBuffer and remove the parsed command for us.
if (is_array($commands) && count($commands))
{
foreach ($commands as $command)
{
//Get the returned function name and arguments from the response.
$funcname = 'Command_' . $command['cmd'];
$fdata = $command['data'];
//Check if the command exists, and if so, execute it.
if (method_exists($this, $funcname))
{
call_user_func(array(
$this, $funcname
), $fdata);
}
}
}
}
}
/**
* Forwarder to AppCommUtility::echof
*/
public function log()
{
call_user_func_array('AppCommUtility::echof', func_get_args());
}
/**
* Simple redirector to the ACU HandleLog command.
* @param string $message
* @return
*/
final public function SendLog($message)
{
AppCommUtility::HandleLog($message);
}
/**
* Redirect to the ACU SendData command.
* @param string $cmd Command id to return to parent.
* @param array/string $data [optional] data to return with the command.
* @return
*/
final public function SendData($cmd, $data = NULL)
{
AppCommUtility::SendData($cmd, $data);
}
/**
* Redirect to the ACU SendData command.
* @param string $cmd Command id to return to parent.
* @param array/string $data [optional] data to return with the command.
* @return
*/
final public function respond($cmd, $data = NULL)
{
AppCommUtility::SendData($cmd, $data);
}
/**
* internal handler for shutting down app incase user uses shutdown as command name.
* @param object $reason
* @return
*/
final protected function Command___shutdown($reason)
{
AppCommUtility::HandleLog($this->AppName . ' - Shutting down - ' . $reason);
exit("App Comm: Shutting Down" . $reason);
}
/**
* Remote ability to change timer interval.
* @param object $timer [optional]
* @return
*/
final protected function Command___settimer($timer = 1000)
{
$this->timer = $timer;
}
/**
* Internal command responder to ping, as used by ACC isResponding Function
* @param object $args Data to respond with to assure responding status.
* @return
*/
final protected function Command___ping($args)
{
AppCommUtility::SendData('pong', $args['data']);
}
/**
* Remote ability to change applications name, not too useful
* @param object $args
* @return
*/
final protected function Command___setname($args)
{
$this->AppName = $args;
}
/**
* Not implemented yet, to be used for tranfering information about the current state of the application.
* @param object $args
* @return
*/
final protected function Command___getstate($args)
{
AppCommUtility::SendData('__state', array(
'name' => $this->AppName
));
}
//Default commands that user can override.
/**
* shuts down the application when issued.
* @param object $args
* @return
*/
public function Command_quit($args)
{
$this->Command___shutdown($args);
}
/**
* responds with pong if pinged.
* @param object $args
* @return
*/
public function Command_ping($args)
{
$this->Command___ping($args);
}
/**
* Simply stops the main loop and exits...
* @return
*/
protected function Stop()
{
AppCommEvents::stopEventLoop();
}
}
/**
* Provides utility functions for both ACS and ACC to use in linking the 2 classes data
* communications together.
*
* This class is static
*
* God i'm tired of documenting already, and I have like 12 more files to go :(
*
* @package AppCommUtility
*/
class AppCommUtility
{
//Prefix and suffixes used to wrap encoded commands.
const encodePrefix = '<appcom:encoded>';
const encodeSuffix = '</appcom:encoded>';
//this classes version.
const VERSION = 1.2;
static $stdout = null;
/**
* Takes a block of data, and detects if its encoded. If its encoded it will decode it
* then process the data and return an array of commands in the data packet.
*
* @param string $data Either base 64 encoded string or plain string command.
* @return array an array of each command (cmd:data arrays)
*/
static final public function ParseCommand(&$buffer, $startMarker = self::encodePrefix, $endMarker = self::encodeSuffix)
{
$return = NULL;
if(strlen($buffer))
{
$start = strpos($buffer, $startMarker);
$end = strpos($buffer, $endMarker, ($start !== false && $start < strlen($buffer) ? $start + 1 : 0));
if($start !== false && $end !== false)
{
$length = $end - $start - strlen($startMarker);
$substring = substr($buffer, $start + strlen($startMarker), $length);
$return = unserialize(
base64_decode(
$substring
)
);
$buffer = substr($buffer, $end+strlen($endMarker));
}else if($start === false)
{
if($end === false)
{
@list($cmd, $args) = explode(' ', $buffer, 2);
$buffer = '';
}else{
$str = substr($buffer, 0, $end);
$buffer = substr($buffer, $end+strlen($endMarker));
@list($cmd, $args) = explode(' ', $str, 2);
}
$return = array('cmd' => $cmd, 'data' => self::var_import($args));
}
}
//var_dump($start, $end, $return);echo"\n------\n";
return $return;
/*
//Trim the data, it shouldnt have spaces.
$data = rtrim($data);
if (strlen($data) > 0)
{
//match all encoded data.
//@todo Fix this to buffer the data and process as it receives a complete
//data segment, as this will drop some data if its received in chunks.
if (preg_match('#^' . preg_quote(self::encodePrefix) . '(.*?)' . preg_quote(self::encodeSuffix) . '#s', $data, $matches))
{
$return = unserialize(
//gzuncompress(
base64_decode($matches[1])
//)
);
//Remove it from the buffer.
$data = preg_replace('#^' . preg_quote(self::encodePrefix) . '(.*?)' . preg_quote(self::encodeSuffix) . '#s', '', $data);
return $return;
} else
{
if (!preg_match('#^' . preg_quote(self::encodePrefix) . '(.*?)#', $data))
{
//Command was not encoded (or @todo needs to be handled as encoded chunks)
//Get the data and return it as single array.
@list($cmd, $args) = @explode(' ', $data, 2);
$data = '';
return array('cmd' => $cmd, 'data' => self::var_import($args));
} else
{
//Incomplete encoded buffer sent.
return NULL;
}
}
} else
{
//No data was passed, return nothing.
return NULL;
}*/
}
static final public function ParseCommands(&$buffer)
{
$return = array();
while ($command = self::ParseCommand($buffer))
{
$return[] = $command;
}
return $return;
}
/**
* Handles a log action, which simply echos to screen if this is not the parent.
* If we are a child (APPCOM NOT CONTROLLER env variable), encode the message with
* a log command to redirect to the parent. If the parent is also a child, it will also forward it
* so the log message will eventually land on the root parent and be echod to the screen.
* @param string $message The log message to display.
* @param bool $notrim [optional] Do we trim the data or not (trimming tabs off start)
* @return
*/
static final public function HandleLog($message, $notrim = false)
{
if(!$notrim) $message = trim($message);
if (!ENV___APPCOMM_NOTCONTROLLER)
{
//If the ENV isnt set, this is the root terminal thats controlling all the clients.
self::$stdout->sendData($message ."\n");
} else
{
//We are a spawned application from an AppCommClient(), relay it back to the next parent.
self::SendData('__log', $message);
}
}
/**
* Encodes data and either optionally echos it (to send to parent), or
* returns it (so it can be sent to a child).
* @param object $cmd
* @param object $data [optional]
* @param object $return [optional]
* @return
*/
static final public function SendData($cmd, $data = NULL,
AppCommWriteBuffer $stream = null,
$noencode = false)
{
//prepend and append the prefix/suffix tags around base64/serialized version of the cmd/data array.
if (!$noencode)
{
$encoded = self::encodePrefix .
base64_encode(
//gzcompress(
serialize(
array('cmd' => $cmd, 'data' => $data)
)
//)
)
. self::encodeSuffix;
} else
{
$encoded = $cmd . $data;
}
if (is_null($stream))
{
$stream = self::$stdout;
}
$stream->sendData($encoded);
}
static final public function
registerWriteStream($stream, AppCommWriteBuffer $writeObj)
{
AppCommEvents::registerWriteStream(
$stream, 'AppCommUtility::_handleWrite', $writeObj
);
}
static final public function _handleWrite($stream, AppCommWriteBuffer $write)
{
if (count($write->writeBuffer))
{
//print_r($write);
$data = reset($write->writeBuffer);
$key = key($write->writeBuffer);
//echo "writing data: $data\n";
$written = fwrite($stream, $data, 8192);
if (!ENV___APPCOMM_NOTCONTROLLER)
{
//echo "wrote $written bytes: $data\n";
}
if ($written !== FALSE)
{
if ($written < strlen($data))
{
$write->writeBuffer[$key] = substr($data, $written);
} else
{
unset($write->writeBuffer[$key]);
}
}
}/*
if (count($write->writeBuffer))
{
$data = array_shift($write->writeBuffer);
$written = fwrite($stream, $data);
if ($written !== FALSE)
{
if ($written < strlen($data))
{
array_unshift($write->writeBuffer, substr($data, $written));
}
}
}*/
}
static final public function mtime()
{
return number_format(microtime(true) * 1000, 0, '', '');
}
static final public function usleep($usecs)
{
if (function_exists('usleep'))
{
usleep($usecs);
} else
{
//This is to emulate usleep() on windows.
if (!function_exists('socket_create'))
{
die("Windows systems require the Sockets Extension. Please enable php_sockets.dll to use this feature.");
}
static $socket = false;
if ($socket === false
)$socket = array(
socket_create(AF_INET, SOCK_RAW, 0)
);
socket_select($read = NULL, $write = NULL, $socket, 0, $usecs);
}
}
/**
* Echo's the message to the parent Application, prefixing the client name when a child.
* @return
*/
static function echof()
{
$argv = func_get_args();
$indent = ' ';
$tabs = 0;
/*if (is_numeric($argv[0]))
$tabs = array_shift($argv);*/
$format = array_shift($argv);
$prefix = '';
if (ENV___APPCOMM_SERVERNAME)
$prefix = '-(' . ENV___APPCOMM_SERVERNAME . '): ';
if (count($argv))
{
self::HandleLog($prefix . vsprintf($format, $argv), true);
} else
{
self::HandleLog($prefix . $format, true);
}
}
function wlog()
{
$argv = func_get_args();
$indent = ' ';
$tabs = 0;
if (is_numeric($argv[0]))
$tabs = array_shift($argv);
$format = array_shift($argv);
$prefix = '';
if (ENV___APPCOMM_SERVERNAME)
$prefix = '-(' . ENV___APPCOMM_SERVERNAME . '): ';
if (count($argv))
{
$message = str_repeat($indent, $tabs) . $prefix . vsprintf($format, $argv);
} else
{
$message = str_repeat($indent, $tabs) . $prefix . $format;
}
file_put_contents(dirname($SERVER['PHP_SELF']) . '/appcomm.log',
@file_get_contents(dirname($SERVER['PHP_SELF']) . '/appcomm.log') .
$_SERVER['PHP_SELF'] . ':' . $message . "\n"
);
}
/** this file is done, stop reading now * */
/** I Said stop reading now... There's no more code in this file. * */
/**
* Fine, if you insist.
* * DON'T TRY TO READ THIS CODE
* * DON'T TRY TO READ THIS CODE
*
* This function was a major headache
*
* works the opposite of var_export, by taking a string formatted like defining
* an array(); and turning it into an actual array
*
* We could use eval, but thats not really that safe....
*
* I'm not even going to try to comment this code... Just... use it and don't even
* try to read it. I wrote it at like 6am in the morning and its HORRIBLE.
*
* But hey it works, One day can rewrite it to be more accurate, as this version is horrible
* but its low priority as the only thing its used for is writing arrays i nthe command line
* typing into an application
*
* Known limitation: All key/value pairs must be enclosed with double quotes. normally
* in an array you could do array( 3 => 'c'), but it must be in array( "3" => "c") format for this.
*
*
* * DON'T TRY TO READ THIS CODE
* * DON'T TRY TO READ THIS CODE
* * DON'T TRY TO READ THIS CODE
* * DON'T TRY TO READ THIS CODE
* * DON'T TRY TO READ THIS CODE
* * DON'T TRY TO READ THIS CODE
*
*
* @param string $string the string to turn into an array.
* @return array The resulting array, or the original string if invalid array.
*/
static final public function var_import($string)
{
if (substr(trim($string), 0, 5) != 'array')
{
return $string;
}
$string = trim($string);
$string = trim(substr($string, 5, strlen($string) - 5));
$bS = false;
$bD = false;
$last = '';
$newstring = '';
for ($i = 0; $i < strlen($string); $i++)
{
$c = substr($string, $i, 1);
if ($c == '"' && $last != "\\"
)$bD = !$bD;
if ($c == "'" && $last != "\\"
)$bS = !$bS;
if (!$bD && !$bS && $c == ')' && $last != ','
)$newstring .= ',';
if (strstr("'\", => array()", $c) || $bS || $bD
)$newstring .= $c;
if (!$bD && !$bS && $c == '('
)$newstring .= 'array';
if ($c != ' '
)$last = $c;
if ($c == ';' && !$bS && !$bD)
break;
}
$chunks = preg_split('(array|,)', $newstring);
$array = array(
);
$p = &$array;
$i = 0;
$bK = false;
$bkey = '';
foreach ($chunks as $ckey => $chunk)
{
$chunk = trim($chunk);
if (strlen($chunk) > 0)
{
if ($chunk == ')')
{
$p = &${'pL' . --$i};
if ($i == 0)
break;
}
else if ($chunk == '(')
{
${'pL' . $i++} = &$p;
if ($bK == true)
{
$p = &$p[$bkey];
$bK = false;
} else
{
$p = &$p[];
}
} else
{
if (strpos($chunk, '=>'))
{
list($key, $value) = explode('=>', $chunk, 2);
} else
{
$key = '';
$value = $chunk;
}
$key = trim($key);
$value = trim($value);
foreach (array(
'key' => $key, 'value' => $value
) as $sKey => $sValue)
{
if (substr($sValue, 0, 1) == '"' || substr($sValue, 0, 1) == "'")
{
$$sKey = substr($sValue, 1, strlen($sValue) - 2);
}
}
if (strlen($value) == 0)
{
$bK = true;
$bkey = $key;
} else
{
if (strlen($key) > 0)
{
$p[$key] = $value;
} else
{
$p[] = $value;
}
}
}
}
}
return $array[0];
}
}
stream_set_blocking(STDIN, false);
stream_set_blocking(STDOUT, false);
AppCommUtility::$stdout = new AppCommWriteBuffer(STDOUT);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment