Skip to content

Instantly share code, notes, and snippets.

@krakjoe
Last active October 11, 2023 19:07
Show Gist options
  • Save krakjoe/3c4efb20e14db01bb0b9bc88dd14ff7c to your computer and use it in GitHub Desktop.
Save krakjoe/3c4efb20e14db01bb0b9bc88dd14ff7c to your computer and use it in GitHub Desktop.
Executor Service with Parallel and Channels
<?php
use \parallel\{Runtime, Channel};
class ExecutorService {
public function __construct(int $workers, string $channel = __CLASS__, int $backlog = Channel::Infinite) {
if ($backlog == 0) {
/*
* execute() will block until a worker is ready
*/
$this->channel = Channel::make($channel);
} else {
/*
* execute() will not block (until backlog is reached)
*/
$this->channel = Channel::make($channel, $backlog);
}
for ($worker = 0; $worker < $workers; $worker++) {
$this->workers[$worker] = new Runtime;
$this->workers[$worker]->run(
Closure::fromCallable(
[self::class, "__thread"]),
[(string)$this->channel]);
}
}
private static function __thread(string $channel) {
$channel = Channel::open($channel);
while (($job = $channel->recv())) {
/*
* Missing try {} for simplicity
*/
($job["function"])(...$job["argv"]);
}
}
public function execute(Closure $closure, array $argv) {
$this->channel->send([
"function" => $closure,
"argv" => $argv
]);
}
public function __destruct() {
/*
* Notify workers to shutdown
*/
foreach ($this->workers as $worker) {
$this->channel->send(false);
}
/*
* Close workers
*/
foreach ($this->workers as $worker) {
$worker->close();
}
/*
* Close channel
*/
$this->channel->close();
}
}
$executor = new ExecutorService(4);
while (++$i<1000) {
$executor->execute(function($i){
printf("{$i}: Hello from #%d\n",
zend_thread_id());
}, [$i]);
}
?>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment