Skip to content

Instantly share code, notes, and snippets.

@ramsey
Last active January 27, 2023 23:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ramsey/582bea383bc9d1f31019da8546681953 to your computer and use it in GitHub Desktop.
Save ramsey/582bea383bc9d1f31019da8546681953 to your computer and use it in GitHub Desktop.
<?php
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Pipeline\Pipeline;
use Amp\SignalCancellation;
use Generator;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use function Amp\ByteStream\getStdout;
use function Amp\delay;
use const PHP_EOL;
use const SIGINT;
use const SIGTERM;
$logHandler = new StreamHandler(getStdout());
$logHandler->pushProcessor(new PsrLogMessageProcessor());
$logHandler->setFormatter(new ConsoleFormatter());
$logger = new Logger('daemon');
$logger->pushHandler($logHandler);
$logger->useLoggingLoopDetection(false);
$signalCancellation = new SignalCancellation([SIGINT, SIGTERM]);
$signalCancellation->subscribe(function () use ($logger) {
echo PHP_EOL;
$logger->info('Received interrupt signal, stopping daemon');
exit(0);
});
$pipeline = Pipeline::fromIterable(function () use ($signalCancellation): Generator {
$i = 0;
while (true) { // @phpstan-ignore-line
yield ++$i; // phpcs:ignore
if ($i % 51 === 0) {
// Simulate when no items are found, we'll delay and wait a little.
yield -1;
delay(5, cancellation: $signalCancellation);
}
}
});
$pipeline
->concurrent(10) // Process 10 values concurrently
->unordered() // Results may be consumed eagerly and out of order
->delay(1); // Delay for 1 second to simulate I/O
/**
* Do something for each value the pipeline receives from the generator.
*/
$pipeline->forEach(
/**
* @param int $value
*/
function (int $value) use ($logger): void {
if ($value === -1) {
$logger->info('Process is idle; waiting for data');
} else {
$logger->info('Processing data', ['id' => $value]);
}
},
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment