Skip to content

Instantly share code, notes, and snippets.

@krakjoe
Last active October 11, 2023 19:07
Show Gist options
  • Star 27 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save krakjoe/0ee02b887288720d9b785c9f947f3a0a to your computer and use it in GitHub Desktop.
Save krakjoe/0ee02b887288720d9b785c9f947f3a0a to your computer and use it in GitHub Desktop.
parallel Futures, Channels (buffered, unbuffered, synchros), Events using parallel producer/consumer pattern
<?php
use \parallel\{Runtime, Future, Channel, Events};
/* usage php crawler.php [http://example.com] [workers=8] [limit=500] */
$page = $argv[1] ?: "https://blog.krakjoe.ninja"; # start crawling this page
$workers = $argv[2] ?: 8; # start this number of threads
$limit = $argv[3] ?: 500; # stop at this number of unique pages
$timeout = $argv[4] ?: 3; # socket timeout for producers
#############################################################################################
$producer = function(int $worker, int $timeout){
libxml_use_internal_errors(true);
ini_set('default_socket_timeout', $timeout);
$crawling = true;
$produce = Channel::open("crawler.production");
$consume = Channel::open("crawler.consumption");
$errors = Channel::open("management.errors");
$manager = Channel::open("management.{$worker}");
while ($url = $produce->recv()) {
printf("Producer %ld working %s\n", $worker, $url);
$html = @file_get_contents($url);
if (!$html) {
/* inform manager of errors */
$errors->send($url);
continue;
}
$consume->send([
"href" => $url,
"content" => $html
]);
if ($crawling) {
$parsed = parse_url($url);
$docroot = sprintf(
"%s://%s",
$parsed["scheme"],
$parsed["host"]
);
$dom = new DOMDocument();
$dom->loadHTML($html);
foreach ($dom->getElementsByTagName("a") as $anchor) {
$href = $anchor->getAttribute("href");
if (!$href || strpos($href, $docroot) !== 0) {
continue;
}
/* do management check */
$manager->send($href);
if (($result = $manager->recv()) === -1) {
/* manager says we hit limits,
tell this (or another) producer to shutdown */
$produce->send(
$crawling = false);
break;
} else {
if ($result) {
/* allowed to add */
$produce->send($href);
}
}
}
}
}
if ($crawling) {
/* if still crawling, tell next producer to quit */
$produce->send(false);
}
/* notify a consumer to shutdown,
this degrades consumers gracefully as producers
are shutdown */
$consume->send(false);
/* notify manager, producer done */
$manager->close();
};
$consumer = function($worker){
/* the consumer doesn't do anything, just prints what it got */
$consume = Channel::open("crawler.consumption");
while ($result = $consume->recv()) {
printf("Consumer %ld working on %s with %d bytes\n",
$worker, $result["href"], strlen($result["content"]));
}
};
$manager = function(string $page, int $workers, int $limit){
$events = new Events;
$index = [
$page => true
];
$closing = 0;
$failing = [];
/* add error channel */
$events->addChannel(
Channel::open("management.errors"));
/* open and add management channels for producers */
for ($worker = 0; $worker < $workers; $worker++) {
$events->addChannel(
Channel::open("management.{$worker}"));
}
foreach ($events as $event) {
/* we have notification of an error */
if ($event->source == "management.errors") {
$failing[
/* update failing list */
] = $event;
$events->addChannel($event->object);
continue;
}
/* producer closed management channel */
if ($event->type == Events\Event\Type::Close) {
if (++$closing == $workers) {
/* all producers closed,
no more errors are coming */
$events->remove("management.errors");
}
continue;
}
/* index check */
if (count($index) == $limit) {
/* reached limit of index,
producer will not send any more data */
$event->object->send(-1);
} else {
if (isset($index[$event->value])) {
/* already exists in index, do not add */
$event->object->send(false);
} else {
/* set in index and allow caller to add */
$index[
$event->value
] = true;
$event->object->send(true);
}
}
/* expect another event on this channel */
$events->addChannel($event->object);
}
return ["ok" => count($index), "fail" => count($failing)];
};
$make = function(Closure $closure, array $argv = []) : Future {
$runtime =
new Runtime;
return $runtime->run($closure, $argv);
};
$run = function(string $page, int $workers, int $limit, int $timeout)
use($make, $producer, $consumer, $manager) {
$produce = Channel::make("crawler.production", Channel::Infinite);
$consume = Channel::make("crawler.consumption");
$errors = Channel::make("management.errors", Channel::Infinite);
$producers = [];
$consumers = [];
$managers = [];
$events = new Events;
$start = microtime(true);
if ($workers >= $limit) {
$workers = $limit;
}
for ($worker = 0; $worker < $workers; $worker++) {
/* create management channel */
$managers[$worker] =
Channel::make("management.{$worker}");
/* create producer */
$producers[$worker] =
$make($producer, [$worker, $timeout]);
/* create consumer */
$consumers[$worker] =
$make($consumer, [$worker]);
/* add consumer to event loop */
$events->addFuture($worker, $consumers[$worker]);
}
/* create manager */
$management =
$make($manager, [$page, $workers, $limit]);
/* start */
$produce->send($page);
/* wait for consumers to close */
while ($event = $events->poll());
/* fetch result from manager */
$result =
$management->value();
printf("Finished with %d pages (%d %s) in %.2f seconds\n",
$result["ok"],
$result["fail"],
$result["fail"] == 1 ?
"fail" : "failures",
microtime(true) - $start);
};
$run($page, $workers, $limit, $timeout);
?>
@Tuckbros
Copy link

Tuckbros commented Sep 21, 2019

For those who have to compile php to support thread safety and having trouble to make it work out of the box, make sure your php is also supporting https (openssl enable). Otherwise you can use an http page (as long as it is not redirected to https). The symptom is made obvious when you remove the '@' at line 26.

@niloct
Copy link

niloct commented Oct 21, 2019

Can I pass object callables to $make ? Like \Closure::fromCallable(array($this, "my_method")) (assuming $this is from a class that has a parallel instance in its properties (that's what I am trying to accomplish).

@danbadds38
Copy link

this is awesome!

@CViniciusSDias
Copy link

I'm still not able to understand Events. :'(
It's my first time reading parallel's documentation today and I stumbled into this code. Not easy at all. Do you have any simpler example or some tutorial on what are and when to use events?

@ashishpadave
Copy link

Could you please help me on how to get the results or all workers in the final output? Assuming that each worker returns a single integer value. I would like to get an array of these integers in the output (run function)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment