Skip to content

Instantly share code, notes, and snippets.

@krakjoe
Last active October 11, 2023 19:07
Show Gist options
  • Star 27 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save krakjoe/0ee02b887288720d9b785c9f947f3a0a to your computer and use it in GitHub Desktop.
Save krakjoe/0ee02b887288720d9b785c9f947f3a0a to your computer and use it in GitHub Desktop.
parallel Futures, Channels (buffered, unbuffered, synchros), Events using parallel producer/consumer pattern
<?php
use \parallel\{Runtime, Future, Channel, Events};
/* usage php crawler.php [http://example.com] [workers=8] [limit=500] */
$page = $argv[1] ?: "https://blog.krakjoe.ninja"; # start crawling this page
$workers = $argv[2] ?: 8; # start this number of threads
$limit = $argv[3] ?: 500; # stop at this number of unique pages
$timeout = $argv[4] ?: 3; # socket timeout for producers
#############################################################################################
$producer = function(int $worker, int $timeout){
libxml_use_internal_errors(true);
ini_set('default_socket_timeout', $timeout);
$crawling = true;
$produce = Channel::open("crawler.production");
$consume = Channel::open("crawler.consumption");
$errors = Channel::open("management.errors");
$manager = Channel::open("management.{$worker}");
while ($url = $produce->recv()) {
printf("Producer %ld working %s\n", $worker, $url);
$html = @file_get_contents($url);
if (!$html) {
/* inform manager of errors */
$errors->send($url);
continue;
}
$consume->send([
"href" => $url,
"content" => $html
]);
if ($crawling) {
$parsed = parse_url($url);
$docroot = sprintf(
"%s://%s",
$parsed["scheme"],
$parsed["host"]
);
$dom = new DOMDocument();
$dom->loadHTML($html);
foreach ($dom->getElementsByTagName("a") as $anchor) {
$href = $anchor->getAttribute("href");
if (!$href || strpos($href, $docroot) !== 0) {
continue;
}
/* do management check */
$manager->send($href);
if (($result = $manager->recv()) === -1) {
/* manager says we hit limits,
tell this (or another) producer to shutdown */
$produce->send(
$crawling = false);
break;
} else {
if ($result) {
/* allowed to add */
$produce->send($href);
}
}
}
}
}
if ($crawling) {
/* if still crawling, tell next producer to quit */
$produce->send(false);
}
/* notify a consumer to shutdown,
this degrades consumers gracefully as producers
are shutdown */
$consume->send(false);
/* notify manager, producer done */
$manager->close();
};
$consumer = function($worker){
/* the consumer doesn't do anything, just prints what it got */
$consume = Channel::open("crawler.consumption");
while ($result = $consume->recv()) {
printf("Consumer %ld working on %s with %d bytes\n",
$worker, $result["href"], strlen($result["content"]));
}
};
$manager = function(string $page, int $workers, int $limit){
$events = new Events;
$index = [
$page => true
];
$closing = 0;
$failing = [];
/* add error channel */
$events->addChannel(
Channel::open("management.errors"));
/* open and add management channels for producers */
for ($worker = 0; $worker < $workers; $worker++) {
$events->addChannel(
Channel::open("management.{$worker}"));
}
foreach ($events as $event) {
/* we have notification of an error */
if ($event->source == "management.errors") {
$failing[
/* update failing list */
] = $event;
$events->addChannel($event->object);
continue;
}
/* producer closed management channel */
if ($event->type == Events\Event\Type::Close) {
if (++$closing == $workers) {
/* all producers closed,
no more errors are coming */
$events->remove("management.errors");
}
continue;
}
/* index check */
if (count($index) == $limit) {
/* reached limit of index,
producer will not send any more data */
$event->object->send(-1);
} else {
if (isset($index[$event->value])) {
/* already exists in index, do not add */
$event->object->send(false);
} else {
/* set in index and allow caller to add */
$index[
$event->value
] = true;
$event->object->send(true);
}
}
/* expect another event on this channel */
$events->addChannel($event->object);
}
return ["ok" => count($index), "fail" => count($failing)];
};
$make = function(Closure $closure, array $argv = []) : Future {
$runtime =
new Runtime;
return $runtime->run($closure, $argv);
};
$run = function(string $page, int $workers, int $limit, int $timeout)
use($make, $producer, $consumer, $manager) {
$produce = Channel::make("crawler.production", Channel::Infinite);
$consume = Channel::make("crawler.consumption");
$errors = Channel::make("management.errors", Channel::Infinite);
$producers = [];
$consumers = [];
$managers = [];
$events = new Events;
$start = microtime(true);
if ($workers >= $limit) {
$workers = $limit;
}
for ($worker = 0; $worker < $workers; $worker++) {
/* create management channel */
$managers[$worker] =
Channel::make("management.{$worker}");
/* create producer */
$producers[$worker] =
$make($producer, [$worker, $timeout]);
/* create consumer */
$consumers[$worker] =
$make($consumer, [$worker]);
/* add consumer to event loop */
$events->addFuture($worker, $consumers[$worker]);
}
/* create manager */
$management =
$make($manager, [$page, $workers, $limit]);
/* start */
$produce->send($page);
/* wait for consumers to close */
while ($event = $events->poll());
/* fetch result from manager */
$result =
$management->value();
printf("Finished with %d pages (%d %s) in %.2f seconds\n",
$result["ok"],
$result["fail"],
$result["fail"] == 1 ?
"fail" : "failures",
microtime(true) - $start);
};
$run($page, $workers, $limit, $timeout);
?>
@danbadds38
Copy link

this is awesome!

@CViniciusSDias
Copy link

I'm still not able to understand Events. :'(
It's my first time reading parallel's documentation today and I stumbled into this code. Not easy at all. Do you have any simpler example or some tutorial on what are and when to use events?

@ashishpadave
Copy link

Could you please help me on how to get the results or all workers in the final output? Assuming that each worker returns a single integer value. I would like to get an array of these integers in the output (run function)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment