Skip to content

Instantly share code, notes, and snippets.

@ramsey

ramsey/child.php Secret

Created January 28, 2023 00:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ramsey/c76cbd243f1d73b9efcbc7702701946a to your computer and use it in GitHub Desktop.
Save ramsey/c76cbd243f1d73b9efcbc7702701946a to your computer and use it in GitHub Desktop.
<?php
declare(strict_types=1);
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Sync\Channel;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use function Amp\ByteStream\getStdout;
$logHandler = new StreamHandler(getStdout());
$logHandler->pushProcessor(new PsrLogMessageProcessor());
$logHandler->setFormatter(new ConsoleFormatter());
$logger = new Logger('daemon');
$logger->pushHandler($logHandler);
$logger->useLoggingLoopDetection(false);
return function (Channel $channel) use ($logger): void {
$event = 0;
while (true) { // @phpstan-ignore-line
for ($i = 0; $i < 100; $i++) {
$channel->send(++$event);
}
$logger->debug('Sleeping for 3 seconds to simulate blocking');
sleep(3); // Blocking call in process.
}
};
<?php
declare(strict_types=1);
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Parallel\Context\ProcessContext;
use Amp\Pipeline\Pipeline;
use Amp\SignalCancellation;
use Dotenv\Dotenv;
use Generator;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use function Amp\ByteStream\getStdout;
use function Amp\ByteStream\pipe;
use function Amp\async;
use function Amp\delay;
use const PHP_EOL;
use const SIGINT;
use const SIGTERM;
require __DIR__ . '/vendor/autoload.php';
(function () {
$logHandler = new StreamHandler(getStdout());
$logHandler->pushProcessor(new PsrLogMessageProcessor());
$logHandler->setFormatter(new ConsoleFormatter());
$logger = new Logger('daemon');
$logger->pushHandler($logHandler);
$logger->useLoggingLoopDetection(false);
$signalCancellation = new SignalCancellation([SIGINT, SIGTERM]);
$signalCancellation->subscribe(function () use ($logger) {
echo PHP_EOL;
$logger->info('Received interrupt signal, stopping daemon');
exit(0);
});
$context = ProcessContext::start(__DIR__ . '/child.php', cancellation: $signalCancellation);
// Pipe any data written to the STDOUT in the child process to STDOUT of this process.
async(fn () => pipe($context->getStdout(), getStdout()));
$logger->debug('Waiting 2 seconds to start...');
delay(2);
$pipeline = Pipeline::fromIterable(function () use ($context): Generator {
while (true) { // @phpstan-ignore-line
/** @var int $event */
$event = $context->receive();
yield $event; // phpcs:ignore
}
});
$pipeline
->concurrent(10) // Process 10 values concurrently
->unordered(); // Results may be consumed eagerly and out of order
/**
* Do something for each value the pipeline receives from the generator.
*/
$pipeline->forEach(function (int $value) use ($logger): void {
$logger->info('Processing data', ['id' => $value]);
});
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment