Skip to content

Instantly share code, notes, and snippets.

@makomweb
Last active January 24, 2024 12:54
Show Gist options
  • Save makomweb/2ff57b3dd9e46a80bdb7b143c2c26dc9 to your computer and use it in GitHub Desktop.
Save makomweb/2ff57b3dd9e46a80bdb7b143c2c26dc9 to your computer and use it in GitHub Desktop.
Reactive Extensions are fun πŸ˜ƒ
<?php
declare(strict_types=1);
namespace App\Command;
use Rx\Observable;
use Rx\Scheduler;
use Rx\Scheduler\ImmediateScheduler;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Throwable;
#[AsCommand(
name: 'app:my-test',
description: 'My 1st Rx Symfony CLI command',
)]
class MyTest extends Command
{
public function __construct()
{
parent::__construct();
Scheduler::setDefaultFactory(
function() {
return new ImmediateScheduler();
}
);
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$io->title('Starting sequence:');
$returnCode = Command::INVALID;
Observable::fromArray([[1,2], [4,5,6,7], [8,9]])
->flatMap(
function (array $items) {
return Observable::fromArray($items);
}
)
->subscribe(
function (int $data) use ($io) {
$io->note(sprintf('Received: %d', $data));
},
function (Throwable $ex) use ($io, &$returnCode) {
$io->error($ex->getMessage());
$returnCode = Command::FAILURE;
},
function () use ($io, &$returnCode) {
$io->info('Completed!');
$returnCode = Command::SUCCESS;
}
);
return $returnCode;
}
}
@makomweb
Copy link
Author

To use the EventLoopScheduler:

  1. add an eventloop to the project via composer require react/event-loop
  2. use new EventLoopScheduler(Loop::get(); instead

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment