Skip to content

Instantly share code, notes, and snippets.

@scribu
Created February 8, 2013 03:12
Show Gist options
  • Star 18 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save scribu/4736329 to your computer and use it in GitHub Desktop.
Save scribu/4736329 to your computer and use it in GitHub Desktop.
Simulate threads in PHP using only proc_open() and co.
<?php
include __DIR__ . '/threads.php';
$commands = array();
for ( $i=0; $i<10; $i++ ) {
$commands[] = "bash -c 'sleep `shuf -i 1-5 -n 1`; echo $i'";
}
$threads = new Multithread( $commands );
$threads->run();
foreach ( $threads->commands as $key=>$command ) {
echo "Command: ".$command."\n";
echo "Output: ".$threads->output[$key];
echo "Error: ".$threads->error[$key]."\n\n";
}
<?php
// Initial code: http://www.php-code.net/2010/05/running-multiple-processes-in-php/
class Thread {
var $process; // process reference
var $pipes; // stdio
var $buffer; // output buffer
var $output;
var $error;
var $timeout;
var $start_time;
function __construct() {
$this->process = 0;
$this->buffer = "";
$this->pipes = (array)NULL;
$this->output = "";
$this->error="";
$this->start_time = time();
$this->timeout = 0;
}
static function create( $command ) {
$t = new Thread;
$descriptor = array ( 0 => array ( "pipe", "r" ), 1 => array ( "pipe", "w" ), 2 => array ( "pipe", "w" ) );
//Open the resource to execute $command
$t->process = proc_open( $command, $descriptor, $t->pipes );
//Set STDOUT and STDERR to non-blocking
stream_set_blocking( $t->pipes[1], 0 );
stream_set_blocking( $t->pipes[2], 0 );
return $t;
}
//See if the command is still active
function isActive() {
$this->buffer .= $this->listen();
$f = stream_get_meta_data( $this->pipes[1] );
return !$f["eof"];
}
//Close the process
function close() {
$r = proc_close( $this->process );
$this->process = NULL;
return $r;
}
//Send a message to the command running
function tell( $thought ) {
fwrite( $this->pipes[0], $thought );
}
//Get the command output produced so far
function listen() {
$buffer = $this->buffer;
$this->buffer = "";
while ( $r = fgets( $this->pipes[1], 1024 ) ) {
$buffer .= $r;
$this->output.=$r;
}
return $buffer;
}
//Get the status of the current runing process
function getStatus() {
return proc_get_status( $this->process );
}
//See if the command is taking too long to run (more than $this->timeout seconds)
function isBusy() {
return ( $this->start_time>0 ) && ( $this->start_time+$this->timeout<time() );
}
//What command wrote to STDERR
function getError() {
$buffer = "";
while ( $r = fgets( $this->pipes[2], 1024 ) ) {
$buffer .= $r;
}
return $buffer;
}
}
//Wrapper for Thread class
class Multithread {
var $output;
var $error;
var $thread;
var $commands = array();
function __construct( $commands ) {
$this->commands = $commands;
foreach ( $this->commands as $key=>$command ) {
$this->thread[$key]=Thread::create( $command );
}
}
function run() {
$commands = $this->commands;
//Cycle through commands
while ( count( $commands )>0 ) {
foreach ( $commands as $key=>$command ) {
//Get the output and the errors
$this->output[$key].=$this->thread[$key]->listen();
$this->error[$key].=$this->thread[$key]->getError();
//Check if command is still active
if ( $this->thread[$key]->isActive() ) {
$this->output[$key].=$this->thread[$key]->listen();
//Check if command is busy
if ( $this->thread[$key]->isBusy() ) {
$this->thread[$key]->close();
unset( $commands[$key] );
}
} else {
//Close the command and free resources
$this->thread[$key]->close();
unset( $commands[$key] );
}
}
}
return $this->output;
}
}
@tbl0605
Copy link

tbl0605 commented Nov 30, 2016

There's an error in method Thread::isBusy(), it should be:

function isBusy() {
	return ( $this->timeout>0 ) && ( $this->start_time+$this->timeout<time() );
}

@Makiwin
Copy link

Makiwin commented Aug 18, 2017

I made a change in Multithread to handle pool size of threads

class Multithread {
var $output=[];
var $error=[];
var $thread;
var $commands = array();
var $hasPool=false;
var $toExecuted=[];

function __construct( $commands ,$sizePool=0) {


    $this->hasPool=$sizePool>0;
    if ($this->hasPool) {

        $this->toExecuted=array_splice($commands,$sizePool);
    }

    //print_r(count($this->toExecuted));
    $this->commands = $commands;
    foreach ( $this->commands as $key=>$command ) {
        $this->thread[$key]=Thread::create( $command );
    }
}


function run() {
    $commands = $this->commands;
    //Cycle through commands
    while ( count( $this->commands )>0 ) {
        foreach ( $this->commands as $key=>$command ) {
            //Get the output and the errors
            @$this->output[$key].=$this->thread[$key]->listen();
            @$this->error[$key].=$this->thread[$key]->getError();
            //Check if command is still active
            if ( $this->thread[$key]->isActive() ) {
                $this->output[$key].=$this->thread[$key]->listen();
                //Check if command is busy
                if ( $this->thread[$key]->isBusy() ) {
                    $this->thread[$key]->close();
                    unset( $this->commands[$key] );
                    self::launchNextInQueue();

                }
            } else {
                //Close the command and free resources
                $this->thread[$key]->close();
                unset( $this->commands[$key] );
                self::launchNextInQueue();
            }
        }
    }
    return $this->output;
}

function launchNextInQueue(){
    if (count($this->toExecuted)==0) return true;
    reset($this->toExecuted);
    $keyToExecuted=key($this->toExecuted);
    $this->commands[$keyToExecuted]=$this->toExecuted[$keyToExecuted];
    $this->thread[$keyToExecuted]=Thread::create( $this->toExecuted[$keyToExecuted] );
    unset($this->toExecuted[$keyToExecuted]);
}

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment