Created
May 9, 2015 13:51
-
-
Save cpriest/106d11863695d1a31fd3 to your computer and use it in GitHub Desktop.
Problem with pThreads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
define('INDEXER_THREADS', 2); | |
define('READER_THREADS', 2); | |
define('START_DIR', 'r:/big'); | |
define('READ_SIZE', 1024); | |
/** | |
* Executor | |
*/ | |
class Executor extends Worker { | |
/** | |
*/ | |
public function __construct() { | |
} | |
} | |
/** | |
* Iterates over a directory, adding files to the file queue and directories to the directory queue | |
* @property Executor $worker | |
*/ | |
class AsyncIndexer extends Collectable { | |
/** @var string */ | |
public $Path; | |
/** @var string[] */ | |
public $tDirpaths = [ ]; | |
/** @var string[] */ | |
public $tFilepaths = [ ]; | |
/** @var string */ | |
public $Path2 = ''; | |
/** | |
* @param string $Path | |
*/ | |
public function __construct($Path) { | |
$this->Path = $Path; | |
$this->tDirpaths = [ ]; | |
$this->tFilePaths = [ ]; | |
} | |
public function run() { | |
printf("%s submitted for indexing\n", $this->Path); | |
foreach(new DirectoryIterator($this->Path) as $objFile) { | |
if($objFile->isDot()) | |
continue; | |
if($objFile->isDir()) { | |
array_push($this->tDirpaths, $objFile->getPathname()); | |
echo "Adding {$objFile->getPathname()} to tDirpaths.\n"; | |
$this->Path2 = $objFile->getPathname(); | |
} | |
// else if($objFile->isFile()) | |
// $this->tFilepaths[] = $objFile->getPathname(); | |
} | |
var_dump($this); | |
echo "Exit loop {$this->Path}\n"; | |
$this->setGarbage(); | |
} | |
} | |
/** | |
* Reads the given file in chunks | |
*/ | |
class AsyncReader extends Collectable { | |
/** @var string */ | |
protected $Path; | |
/** | |
* @param string $Path | |
*/ | |
public function __construct($Path) { | |
$this->Path = $Path; | |
} | |
public function run() { | |
sprintf("Starting read of File %s in chunks of %d\n", $this->Path, READ_SIZE); | |
$fh = fopen($this->Path, 'r', false); | |
while(!feof($fh)) { | |
$d = fread($fh, READ_SIZE); | |
} | |
fclose($fh); | |
sprintf("Completed read of File %s\n", $this->Path); | |
$this->setGarbage(); | |
} | |
} | |
/** | |
* MyPool | |
* | |
* @property float $WorkRemaining The number of items in the work queue | |
*/ | |
class MyPool extends Pool { | |
/** | |
* @param string $Name | |
* | |
* @return int|null | |
*/ | |
public function __get($Name) { | |
if($Name === 'WorkRemaining') | |
return count($this->work); | |
return NULL; | |
} | |
} | |
$Started = microtime(true); | |
$IndexerPool = new MyPool(INDEXER_THREADS, 'Executor'); | |
$ReaderPool = new MyPool(READER_THREADS, 'Executor'); | |
$IndexerPool->submit(new AsyncIndexer(START_DIR)); | |
sleep(5); | |
while(true) { | |
echo "Collecting...\n"; | |
$IndexerPool->collect(function (AsyncIndexer $worker) use ($IndexerPool, $ReaderPool) { | |
echo "Collect(): on $worker->Path\n"; | |
var_dump($worker); | |
// while(count($worker->tDirpaths)) { | |
// $Dirpath = array_shift($worker->tDirpaths); | |
// echo "Creating new AsyncIndexer for {$Dirpath}\n"; | |
// $IndexerPool->submit(new AsyncIndexer($Dirpath)); | |
// } | |
if($worker->isGarbage()) { | |
echo "{$worker->Path} isGarbage...\n"; | |
return true; | |
} | |
return false; | |
// return $worker->isGarbage(); | |
}); | |
$ReaderPool->collect(function (AsyncReader $worker) { | |
return $worker->isGarbage(); | |
}); | |
if($IndexerPool->WorkRemaining === 0 && $ReaderPool->WorkRemaining === 0) { | |
printf('$IndexerPool and $ReaderPool indicate no more work remaining, ending.'.PHP_EOL); | |
break; | |
} | |
printf("Sleeping... Indexer=%d, Reader=%d\n", $IndexerPool->WorkRemaining, $ReaderPool->WorkRemaining); | |
sleep(1); | |
} | |
$IndexerPool->shutdown(); | |
echo '$IndexerPool shutdown...'.PHP_EOL; | |
$ReaderPool->shutdown(); | |
echo '$ReaderPool shutdown...'.PHP_EOL; | |
// $TotalTime = 0; | |
// $pool->collect(function (AsyncExec $work) use(&$TotalTime, &$TotalSize) { | |
// if($work->isGarbage()) { | |
// printf("$work->Path: Completed in %.2f seconds.\n", $work->Ended - $work->Started); | |
// $TotalTime += $work->Ended - $work->Started; | |
// $TotalSize += $work->Size; | |
//// var_dump($work); | |
//// foreach($work->tOutput as $Line) | |
//// printf(" %d: %s\n", $work->num, $Line); | |
// return true; | |
// } | |
// return false; | |
// }); | |
// printf("Copied %sMB in %.2fs at %sMBps using %d threads.\n", number_format(MB($TotalSize), 2), $TotalTime, number_format(MB($TotalSize) / $TotalTime, 2), POOL_THREADS); | |
/** | |
* @param integer $x | |
* | |
* @return float | |
*/ | |
function MB($x) { return $x / 1024 / 1024; } | |
/** | |
* Timer, keeps track of start time and has properties to get timing results | |
*/ | |
class Timer { | |
/** @var float */ | |
protected $Started; | |
/** | |
* Constructor | |
*/ | |
public function __construct() { | |
$this->Started = microtime(true); | |
} | |
/** | |
* @param string $Name | |
* | |
* @return float|null | |
*/ | |
public function __get($Name='') { | |
switch($Name) { | |
case 'Elapsed': | |
return microtime(true) - $this->Started; | |
} | |
return NULL; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Having trouble with the array in Async indexer on line #54, the array is not being modified either with array_push() nor with traditional ->tDirpaths[] = 'xyz';
On line #61 the var_dump indicates that the $tDirpaths has no entries.
Running the entire code the echo's on Line #55 shows that it is getting the data.
Any ideas?