Skip to content

Instantly share code, notes, and snippets.

Forked from krakjoe/crawler.php
Created May 20, 2019 11:12
Show Gist options
  • Save vitalyrotari/b325f79339b5e9c8f71b4b299da52801 to your computer and use it in GitHub Desktop.
Save vitalyrotari/b325f79339b5e9c8f71b4b299da52801 to your computer and use it in GitHub Desktop.
parallel Futures, Channels (buffered, unbuffered, synchros), Events using parallel producer/consumer pattern
use \parallel\{Runtime, Future, Channel, Events};
/* usage php crawler.php [] [workers=8] [limit=500] */
$page = $argv[1] ?: ""; # start crawling this page
$workers = $argv[2] ?: 8; # start this number of threads
$limit = $argv[3] ?: 500; # stop at this number of unique pages
$timeout = $argv[4] ?: 3; # socket timeout for producers
$producer = function(int $worker, int $timeout){
ini_set('default_socket_timeout', $timeout);
$crawling = true;
$produce = Channel::open("crawler.production");
$consume = Channel::open("crawler.consumption");
$errors = Channel::open("management.errors");
$manager = Channel::open("management.{$worker}");
while ($url = $produce->recv()) {
printf("Producer %ld working %s\n", $worker, $url);
$html = @file_get_contents($url);
if (!$html) {
/* inform manager of errors */
"href" => $url,
"content" => $html
if ($crawling) {
$parsed = parse_url($url);
$docroot = sprintf(
$dom = new DOMDocument();
foreach ($dom->getElementsByTagName("a") as $anchor) {
$href = $anchor->getAttribute("href");
if (!$href || strpos($href, $docroot) !== 0) {
/* do management check */
if (($result = $manager->recv()) === -1) {
/* manager says we hit limits,
tell this (or another) producer to shutdown */
$crawling = false);
} else {
if ($result) {
/* allowed to add */
if ($crawling) {
/* if still crawling, tell next producer to quit */
/* notify a consumer to shutdown,
this degrades consumers gracefully as producers
are shutdown */
/* notify manager, producer done */
$consumer = function($worker){
/* the consumer doesn't do anything, just prints what it got */
$consume = Channel::open("crawler.consumption");
while ($result = $consume->recv()) {
printf("Consumer %ld working on %s with %d bytes\n",
$worker, $result["href"], strlen($result["content"]));
$manager = function(string $page, int $workers, int $limit){
$events = new Events;
$index = [
$page => true
$closing = 0;
$failing = [];
/* add error channel */
/* open and add management channels for producers */
for ($worker = 0; $worker < $workers; $worker++) {
foreach ($events as $event) {
/* we have notification of an error */
if ($event->source == "management.errors") {
/* update failing list */
] = $event;
/* producer closed management channel */
if ($event->type == Events\Event\Type::Close) {
if (++$closing == $workers) {
/* all producers closed,
no more errors are coming */
/* index check */
if (count($index) == $limit) {
/* reached limit of index,
producer will not send any more data */
} else {
if (isset($index[$event->value])) {
/* already exists in index, do not add */
} else {
/* set in index and allow caller to add */
] = true;
/* expect another event on this channel */
return ["ok" => count($index), "fail" => count($failing)];
$make = function(Closure $closure, array $argv = []) : Future {
$runtime =
new Runtime;
return $runtime->run($closure, $argv);
$run = function(string $page, int $workers, int $limit, int $timeout)
use($make, $producer, $consumer, $manager) {
$produce = Channel::make("crawler.production", Channel::Infinite);
$consume = Channel::make("crawler.consumption");
$errors = Channel::make("management.errors", Channel::Infinite);
$producers = [];
$consumers = [];
$managers = [];
$events = new Events;
$start = microtime(true);
if ($workers >= $limit) {
$workers = $limit;
for ($worker = 0; $worker < $workers; $worker++) {
/* create management channel */
$managers[$worker] =
/* create producer */
$producers[$worker] =
$make($producer, [$worker, $timeout]);
/* create consumer */
$consumers[$worker] =
$make($consumer, [$worker]);
/* add consumer to event loop */
$events->addFuture($worker, $consumers[$worker]);
/* create manager */
$management =
$make($manager, [$page, $workers, $limit]);
/* start */
/* wait for consumers to close */
while ($event = $events->poll());
/* fetch result from manager */
$result =
printf("Finished with %d pages (%d %s) in %.2f seconds\n",
$result["fail"] == 1 ?
"fail" : "failures",
microtime(true) - $start);
$run($page, $workers, $limit, $timeout);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment