Skip to content

Instantly share code, notes, and snippets.

@trowski
Created January 27, 2023 23:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save trowski/28c8fe782f70071691080ca3f9a8fda5 to your computer and use it in GitHub Desktop.
Save trowski/28c8fe782f70071691080ca3f9a8fda5 to your computer and use it in GitHub Desktop.
<?php
use Amp\Cache\Cache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
use function Amp\delay;
class EventPollingTask implements Task
{
public function run(Channel $channel, Cache $cache, Cancellation $cancellation): mixed
{
while (true) {
$event = $this->pollForEvent($cancellation);
if ($event) {
$channel->send($event);
} else {
delay(1, cancellation: $cancellation);
}
}
}
private function pollForEvent(Cancellation $cancellation): mixed
{
}
}
<?php
require_once __DIR__ . '/../vendor/autoload.php';
use Amp\DeferredCancellation;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Pipeline\Pipeline;
use Amp\SignalCancellation;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use function Amp\ByteStream\getStdout;
use function Amp\delay;
use const PHP_EOL;
use const SIGINT;
use const SIGTERM;
$logHandler = new StreamHandler(getStdout());
$logHandler->pushProcessor(new PsrLogMessageProcessor());
$logHandler->setFormatter(new ConsoleFormatter());
$logger = new Logger('daemon');
$logger->pushHandler($logHandler);
$logger->useLoggingLoopDetection(false);
$deferredCancellation = new DeferredCancellation();
$execution = \Amp\Parallel\Worker\submit(new EventPollingTask(), $deferredCancellation->getCancellation());
$channel = $execution->getChannel();
$signalCancellation = new SignalCancellation([SIGINT, SIGTERM]);
$signalCancellation->subscribe(function () use ($logger, $deferredCancellation) {
echo PHP_EOL;
$deferredCancellation->cancel();
$logger->info('Received interrupt signal, stopping daemon');
exit(0);
});
$pipeline = Pipeline::fromIterable(function () use ($channel): Generator {
while (true) { // @phpstan-ignore-line
$event = $channel->receive();
yield $event; // phpcs:ignore
}
});
$pipeline
->concurrent(10) // Process 10 values concurrently
->unordered(); // Results may be consumed eagerly and out of order
/**
* Do something for each value the pipeline receives from the generator.
*/
$pipeline->forEach(function (int $value) use ($logger): void {
$logger->info('Processing data', ['id' => $value]);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment