Skip to content

Instantly share code, notes, and snippets.

@gggeek
Created July 9, 2013 09:59
Show Gist options
  • Save gggeek/5956177 to your computer and use it in GitHub Desktop.
Save gggeek/5956177 to your computer and use it in GitHub Desktop.
A simple process-manager in php, which uses forking to run tasks - putting a limit on the number of concurrent processes (aka a queue)
<?php
/**
* A simple process manager, forking jobs to run in parallel. Works on linux and windows.
*
* @copyright G. Giunta
* @license GPL v2
*
* @todo add more methods? f.e. one to kill any the executing processes
*/
namespace Forlagshuset\RecommendationBundle\Entities;
class ProcessManager
{
protected $toRun = array();
protected $pipes = array();
protected $childProcs = array();
protected $childResults = array();
protected $startTimes = array();
//protected $pause = 1000;
protected $maxParallel = 0;
/**
* @todo add some options, f.e. throw an exception if any command can not start, or buffers for pipes
*/
public function __construct()
{
}
/**
* Runs commands in parallel, waiting until all of them terminate.
* @param array $commands array of strings. Do not forget to escape them while building them!
* @param int $maxParallel set to 0 for no-limit
* @param int $poll in microseconds, how long to sleep between polling for process termination
* @return array
* @throws \Exception
*/
public function runParallel( array $commands, $maxParallel = 0, $poll = 1000000 )
{
if ( !count( $commands ) )
{
throw new \Exception( "Can not run in parallel 0 commands" );
}
$this->startChildren( $commands, $maxParallel );
do
{
// it's a good idea to sleep a while before we check pipes for the 1st time
usleep( $poll );
}
while( $this->waitFor() > 0 );
return $this->getResults();
}
/**
* Starts commands in parallel - with a maximum parallel level (other commands are queued).
* @param array $commands array of strings. Do not forget to escape them while building them!
* @param int $maxParallel set to 0 for no-limit
* @return int the number of processes started
*
* @see runParallel for an example loop using this function
*/
public function startChildren( array $commands, $maxParallel = 0 )
{
$this->toRun = $commands;
$commandCount = count( $this->toRun );
$this->startTimes = array();
$this->pipes = array();
$this->childProcs = array();
$this->childResults = array_fill( 0, $commandCount, null );
if ( $maxParallel <= 0 || $maxParallel > $commandCount )
{
$maxParallel = $commandCount;
}
$this->maxParallel = $maxParallel;
$started = 0;
for ( $i = 0; $i < $maxParallel; $i++ )
{
if ( $this->startChild( $i ) )
{
$started++;
}
}
return $started;
}
/**
* Checks if there are any child commands finished. If there are any queued, starts them
* @return int number of running processes
*
* @see runParallel for an example loop using this function
*/
public function waitFor()
{
$running = 0;
$time = microtime( true );
for ( $i = 0; $i < count( $this->childProcs ); $i++ )
{
if ( $this->childProcs[$i] !== false )
{
/// @todo see note from Lachlan Mulcahy on http://www.php.net/manual/en/function.proc-get-status.php:
/// to make sure buffers are not blocking children, we should read rom their pipes every now and then
/// (but not on windows, since pipes are blocking and can not be timedout, see https://bugs.php.net/bug.php?id=54717)
$status = proc_get_status( $this->childProcs[$i] );
if ( $status['running'] == false )
{
$this->childResults[$i] = array_merge( $status, array(
'output' => stream_get_contents( $this->pipes[$i][1] ),
'error' => stream_get_contents( $this->pipes[$i][2] ),
'return' => proc_close( $this->childProcs[$i] ),
'starttime' => $this->startTimes[$i],
'stoptime' => $time
) );
$this->childProcs[$i] = false;
}
else
{
$this->childResults[$i] = $status;
$running++;
}
}
}
$started = count( $this->childProcs );
if ( $started < count( $this->toRun ) && $running < $this->maxParallel )
{
for( $i = $running, $j = $started; $i < $this->maxParallel; $i++, $j++ )
{
if ( $this->startChild( $j ) )
{
$running++;
}
}
}
return $running;
}
/**
* Checks if child process i is running
* @param int $i
* @return bool
*/
public function isRunning( $i )
{
if ( $i >= count( $this->childProcs ) || $this->childProcs[$i] == false )
return false;
$status = proc_get_status( $this->childProcs[$i] );
return $status['running'];
}
/**
* Returns true if child process i was started (at least tried to)
* @param int $i
* @return bool
*/
public function wasStarted( $i )
{
return ( $i < count( $this->childProcs ) && $i >= 0 );
}
/**
* Get results for either one process or all of them.
* It can be used to retrieve f.e. the pid of a particular command, after waitFor has been called at least once
* @param integer $i
* @return array
*/
public function getResults( $i = null )
{
return $i !== null ? $this->childResults[$i] : $this->childResults;
}
protected function startChild( $i )
{
$this->startTimes[$i] = microtime( true );
$this->pipes[$i] = null;
$this->childProcs[$i] = proc_open(
$this->toRun[$i],
/// @todo test if error pipe should use 'a' or 'w' on linux
array( array( 'pipe','r' ), array( 'pipe','w' ), array( 'pipe', 'w' ) ),
$this->pipes[$i]
);
fclose( $this->pipes[$i][0] );
if ( !$this->childProcs[$i] )
{
return false;
}
return true;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment