Skip to content

Instantly share code, notes, and snippets.

@tomas-novotny
Created May 5, 2023 10:02
Show Gist options
  • Save tomas-novotny/5abdbd4474c25e768c6eeb168c32cfc1 to your computer and use it in GitHub Desktop.
Save tomas-novotny/5abdbd4474c25e768c6eeb168c32cfc1 to your computer and use it in GitHub Desktop.
Failing reactphp/async tests
<?php
declare(strict_types = 1);
use Bunny\Async\Client;
use Bunny\Channel;
use Bunny\Client as SyncClient;
use Bunny\Message;
use PHPUnit\Framework\TestCase;
use React\EventLoop\Loop;
use React\EventLoop\TimerInterface;
use React\Promise\PromiseInterface;
use function React\Async\await;
use function React\Async\series;
final class SampleTest extends TestCase
{
public function testPublishConsume1(): void
{
$this->doTestPublishConsume();
}
public function testPublishConsume2(): void
{
$this->doTestPublishConsume();
}
public function testPublishConsume3(): void
{
$this->doTestPublishConsume();
}
public function doTestPublishConsume(): void
{
// get loop
$loop = Loop::get();
// define variables
$clientOptions = [
'host' => '127.0.0.1',
'vhost' => '/',
'user' => 'guest',
'password' => 'guest',
];
$messages = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10'];
$queueName = uniqid('TEST');
$consumedMessages = [];
$timers = [];
// connect sync client
$syncBunny = new SyncClient($clientOptions);
$syncBunny->connect();
// connect async client
$asyncBunny = new Client($loop, $clientOptions);
await($asyncBunny->connect());
// create queue
$channelPromise = $asyncBunny->channel();
await($channelPromise->then(static function (Channel $channel) use ($queueName): void {
$channel->queueDeclare($queueName)->then(static function () use ($channel): void {
$channel->close();
});
}));
// consume messages async
$consumer = $asyncBunny->channel()->then(static function (Channel $channel) use ($queueName, &$consumedMessages): PromiseInterface {
return $channel->qos(0, 1, true)->then(static function () use ($channel, $queueName, &$consumedMessages): PromiseInterface {
return $channel->consume(static function (Message $message) use ($channel, &$consumedMessages): void {
$consumedMessages[] = $message->content;
$channel->ack($message);
}, $queueName);
});
});
// publish messages async
$producer = $asyncBunny->channel()->then(static function (Channel $channel) use ($queueName, $messages): PromiseInterface {
return series(array_map(
static fn (string $message): callable => static fn (): PromiseInterface => $channel->publish($message, routingKey: $queueName),
$messages,
));
});
// finish publishing
await($producer);
// failsafe to stop loop
$timers[] = $loop->addTimer(5, static function () use ($loop): void {
$loop->stop();
});
// periodic check if all messages have been consumed
$timers[] = $loop->addPeriodicTimer(1, static function (TimerInterface $timer) use ($loop, $messages, &$consumedMessages): void {
if (count($messages) !== count($consumedMessages)) {
return;
}
$loop->cancelTimer($timer);
$loop->stop();
});
// finish consuming
await($consumer);
// run loop
$loop->run();
// delete test queue
$channel = $syncBunny->channel();
$channel->queueDelete($queueName);
$channel->close();
// cleanup timers
foreach ($timers as $timer) {
$loop->cancelTimer($timer);
}
// assert consumer consumed all messages
self::assertSame($messages, $consumedMessages);
}
public function testSyncPublishWithAsyncConsume(): void
{
// get loop
$loop = Loop::get();
// define variables
$clientOptions = [
'host' => '127.0.0.1',
'vhost' => '/',
'user' => 'guest',
'password' => 'guest',
];
$messages = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10'];
$queueName = uniqid('TEST');
$consumedMessages = [];
$timers = [];
// connect sync client
$syncBunny = new SyncClient($clientOptions);
$syncBunny->connect();
// connect async client
$asyncBunny = new Client($loop, $clientOptions);
await($asyncBunny->connect());
// create queue
$channel = $syncBunny->channel();
$channel->queueDeclare($queueName);
$channel->close();
// consume messages async
$consumer = $asyncBunny->channel()->then(static function (Channel $channel) use ($queueName, &$consumedMessages): PromiseInterface {
return $channel->qos(0, 1, true)->then(static function () use ($channel, $queueName, &$consumedMessages): PromiseInterface {
return $channel->consume(static function (Message $message) use ($channel, &$consumedMessages): void {
$consumedMessages[] = $message->content;
$channel->ack($message);
}, $queueName);
});
});
// publish messages sync
$channel = $syncBunny->channel();
foreach ($messages as $message) {
$channel->publish($message, routingKey: $queueName);
}
$channel->close();
// failsafe to stop loop
$timers[] = $loop->addTimer(5, static function () use ($loop): void {
$loop->stop();
});
// periodic check if all messages have been consumed
$timers[] = $loop->addPeriodicTimer(1, static function (TimerInterface $timer) use ($loop, $messages, &$consumedMessages): void {
if (count($messages) !== count($consumedMessages)) {
return;
}
$loop->cancelTimer($timer);
$loop->stop();
});
// finish consuming
await($consumer);
// run loop
$loop->run();
// delete test queue
$channel = $syncBunny->channel();
$channel->queueDelete($queueName);
$channel->close();
// cleanup timers
foreach ($timers as $timer) {
$loop->cancelTimer($timer);
}
// assert consumer consumed all messages
self::assertSame($messages, $consumedMessages);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment