Skip to content

Instantly share code, notes, and snippets.

@cpriest
Created May 9, 2015 13:51
Show Gist options
  • Save cpriest/106d11863695d1a31fd3 to your computer and use it in GitHub Desktop.
Save cpriest/106d11863695d1a31fd3 to your computer and use it in GitHub Desktop.
Problem with pThreads
<?php
define('INDEXER_THREADS', 2);
define('READER_THREADS', 2);
define('START_DIR', 'r:/big');
define('READ_SIZE', 1024);
/**
* Executor
*/
class Executor extends Worker {
/**
*/
public function __construct() {
}
}
/**
* Iterates over a directory, adding files to the file queue and directories to the directory queue
* @property Executor $worker
*/
class AsyncIndexer extends Collectable {
/** @var string */
public $Path;
/** @var string[] */
public $tDirpaths = [ ];
/** @var string[] */
public $tFilepaths = [ ];
/** @var string */
public $Path2 = '';
/**
* @param string $Path
*/
public function __construct($Path) {
$this->Path = $Path;
$this->tDirpaths = [ ];
$this->tFilePaths = [ ];
}
public function run() {
printf("%s submitted for indexing\n", $this->Path);
foreach(new DirectoryIterator($this->Path) as $objFile) {
if($objFile->isDot())
continue;
if($objFile->isDir()) {
array_push($this->tDirpaths, $objFile->getPathname());
echo "Adding {$objFile->getPathname()} to tDirpaths.\n";
$this->Path2 = $objFile->getPathname();
}
// else if($objFile->isFile())
// $this->tFilepaths[] = $objFile->getPathname();
}
var_dump($this);
echo "Exit loop {$this->Path}\n";
$this->setGarbage();
}
}
/**
* Reads the given file in chunks
*/
class AsyncReader extends Collectable {
/** @var string */
protected $Path;
/**
* @param string $Path
*/
public function __construct($Path) {
$this->Path = $Path;
}
public function run() {
sprintf("Starting read of File %s in chunks of %d\n", $this->Path, READ_SIZE);
$fh = fopen($this->Path, 'r', false);
while(!feof($fh)) {
$d = fread($fh, READ_SIZE);
}
fclose($fh);
sprintf("Completed read of File %s\n", $this->Path);
$this->setGarbage();
}
}
/**
* MyPool
*
* @property float $WorkRemaining The number of items in the work queue
*/
class MyPool extends Pool {
/**
* @param string $Name
*
* @return int|null
*/
public function __get($Name) {
if($Name === 'WorkRemaining')
return count($this->work);
return NULL;
}
}
$Started = microtime(true);
$IndexerPool = new MyPool(INDEXER_THREADS, 'Executor');
$ReaderPool = new MyPool(READER_THREADS, 'Executor');
$IndexerPool->submit(new AsyncIndexer(START_DIR));
sleep(5);
while(true) {
echo "Collecting...\n";
$IndexerPool->collect(function (AsyncIndexer $worker) use ($IndexerPool, $ReaderPool) {
echo "Collect(): on $worker->Path\n";
var_dump($worker);
// while(count($worker->tDirpaths)) {
// $Dirpath = array_shift($worker->tDirpaths);
// echo "Creating new AsyncIndexer for {$Dirpath}\n";
// $IndexerPool->submit(new AsyncIndexer($Dirpath));
// }
if($worker->isGarbage()) {
echo "{$worker->Path} isGarbage...\n";
return true;
}
return false;
// return $worker->isGarbage();
});
$ReaderPool->collect(function (AsyncReader $worker) {
return $worker->isGarbage();
});
if($IndexerPool->WorkRemaining === 0 && $ReaderPool->WorkRemaining === 0) {
printf('$IndexerPool and $ReaderPool indicate no more work remaining, ending.'.PHP_EOL);
break;
}
printf("Sleeping... Indexer=%d, Reader=%d\n", $IndexerPool->WorkRemaining, $ReaderPool->WorkRemaining);
sleep(1);
}
$IndexerPool->shutdown();
echo '$IndexerPool shutdown...'.PHP_EOL;
$ReaderPool->shutdown();
echo '$ReaderPool shutdown...'.PHP_EOL;
// $TotalTime = 0;
// $pool->collect(function (AsyncExec $work) use(&$TotalTime, &$TotalSize) {
// if($work->isGarbage()) {
// printf("$work->Path: Completed in %.2f seconds.\n", $work->Ended - $work->Started);
// $TotalTime += $work->Ended - $work->Started;
// $TotalSize += $work->Size;
//// var_dump($work);
//// foreach($work->tOutput as $Line)
//// printf(" %d: %s\n", $work->num, $Line);
// return true;
// }
// return false;
// });
// printf("Copied %sMB in %.2fs at %sMBps using %d threads.\n", number_format(MB($TotalSize), 2), $TotalTime, number_format(MB($TotalSize) / $TotalTime, 2), POOL_THREADS);
/**
* @param integer $x
*
* @return float
*/
function MB($x) { return $x / 1024 / 1024; }
/**
* Timer, keeps track of start time and has properties to get timing results
*/
class Timer {
/** @var float */
protected $Started;
/**
* Constructor
*/
public function __construct() {
$this->Started = microtime(true);
}
/**
* @param string $Name
*
* @return float|null
*/
public function __get($Name='') {
switch($Name) {
case 'Elapsed':
return microtime(true) - $this->Started;
}
return NULL;
}
}
@cpriest
Copy link
Author

cpriest commented May 9, 2015

Having trouble with the array in Async indexer on line #54, the array is not being modified either with array_push() nor with traditional ->tDirpaths[] = 'xyz';

On line #61 the var_dump indicates that the $tDirpaths has no entries.

Running the entire code the echo's on Line #55 shows that it is getting the data.

Any ideas?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment