Created
May 5, 2023 10:02
-
-
Save tomas-novotny/5abdbd4474c25e768c6eeb168c32cfc1 to your computer and use it in GitHub Desktop.
Failing reactphp/async tests
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types = 1); | |
use Bunny\Async\Client; | |
use Bunny\Channel; | |
use Bunny\Client as SyncClient; | |
use Bunny\Message; | |
use PHPUnit\Framework\TestCase; | |
use React\EventLoop\Loop; | |
use React\EventLoop\TimerInterface; | |
use React\Promise\PromiseInterface; | |
use function React\Async\await; | |
use function React\Async\series; | |
final class SampleTest extends TestCase | |
{ | |
public function testPublishConsume1(): void | |
{ | |
$this->doTestPublishConsume(); | |
} | |
public function testPublishConsume2(): void | |
{ | |
$this->doTestPublishConsume(); | |
} | |
public function testPublishConsume3(): void | |
{ | |
$this->doTestPublishConsume(); | |
} | |
public function doTestPublishConsume(): void | |
{ | |
// get loop | |
$loop = Loop::get(); | |
// define variables | |
$clientOptions = [ | |
'host' => '127.0.0.1', | |
'vhost' => '/', | |
'user' => 'guest', | |
'password' => 'guest', | |
]; | |
$messages = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']; | |
$queueName = uniqid('TEST'); | |
$consumedMessages = []; | |
$timers = []; | |
// connect sync client | |
$syncBunny = new SyncClient($clientOptions); | |
$syncBunny->connect(); | |
// connect async client | |
$asyncBunny = new Client($loop, $clientOptions); | |
await($asyncBunny->connect()); | |
// create queue | |
$channelPromise = $asyncBunny->channel(); | |
await($channelPromise->then(static function (Channel $channel) use ($queueName): void { | |
$channel->queueDeclare($queueName)->then(static function () use ($channel): void { | |
$channel->close(); | |
}); | |
})); | |
// consume messages async | |
$consumer = $asyncBunny->channel()->then(static function (Channel $channel) use ($queueName, &$consumedMessages): PromiseInterface { | |
return $channel->qos(0, 1, true)->then(static function () use ($channel, $queueName, &$consumedMessages): PromiseInterface { | |
return $channel->consume(static function (Message $message) use ($channel, &$consumedMessages): void { | |
$consumedMessages[] = $message->content; | |
$channel->ack($message); | |
}, $queueName); | |
}); | |
}); | |
// publish messages async | |
$producer = $asyncBunny->channel()->then(static function (Channel $channel) use ($queueName, $messages): PromiseInterface { | |
return series(array_map( | |
static fn (string $message): callable => static fn (): PromiseInterface => $channel->publish($message, routingKey: $queueName), | |
$messages, | |
)); | |
}); | |
// finish publishing | |
await($producer); | |
// failsafe to stop loop | |
$timers[] = $loop->addTimer(5, static function () use ($loop): void { | |
$loop->stop(); | |
}); | |
// periodic check if all messages have been consumed | |
$timers[] = $loop->addPeriodicTimer(1, static function (TimerInterface $timer) use ($loop, $messages, &$consumedMessages): void { | |
if (count($messages) !== count($consumedMessages)) { | |
return; | |
} | |
$loop->cancelTimer($timer); | |
$loop->stop(); | |
}); | |
// finish consuming | |
await($consumer); | |
// run loop | |
$loop->run(); | |
// delete test queue | |
$channel = $syncBunny->channel(); | |
$channel->queueDelete($queueName); | |
$channel->close(); | |
// cleanup timers | |
foreach ($timers as $timer) { | |
$loop->cancelTimer($timer); | |
} | |
// assert consumer consumed all messages | |
self::assertSame($messages, $consumedMessages); | |
} | |
public function testSyncPublishWithAsyncConsume(): void | |
{ | |
// get loop | |
$loop = Loop::get(); | |
// define variables | |
$clientOptions = [ | |
'host' => '127.0.0.1', | |
'vhost' => '/', | |
'user' => 'guest', | |
'password' => 'guest', | |
]; | |
$messages = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']; | |
$queueName = uniqid('TEST'); | |
$consumedMessages = []; | |
$timers = []; | |
// connect sync client | |
$syncBunny = new SyncClient($clientOptions); | |
$syncBunny->connect(); | |
// connect async client | |
$asyncBunny = new Client($loop, $clientOptions); | |
await($asyncBunny->connect()); | |
// create queue | |
$channel = $syncBunny->channel(); | |
$channel->queueDeclare($queueName); | |
$channel->close(); | |
// consume messages async | |
$consumer = $asyncBunny->channel()->then(static function (Channel $channel) use ($queueName, &$consumedMessages): PromiseInterface { | |
return $channel->qos(0, 1, true)->then(static function () use ($channel, $queueName, &$consumedMessages): PromiseInterface { | |
return $channel->consume(static function (Message $message) use ($channel, &$consumedMessages): void { | |
$consumedMessages[] = $message->content; | |
$channel->ack($message); | |
}, $queueName); | |
}); | |
}); | |
// publish messages sync | |
$channel = $syncBunny->channel(); | |
foreach ($messages as $message) { | |
$channel->publish($message, routingKey: $queueName); | |
} | |
$channel->close(); | |
// failsafe to stop loop | |
$timers[] = $loop->addTimer(5, static function () use ($loop): void { | |
$loop->stop(); | |
}); | |
// periodic check if all messages have been consumed | |
$timers[] = $loop->addPeriodicTimer(1, static function (TimerInterface $timer) use ($loop, $messages, &$consumedMessages): void { | |
if (count($messages) !== count($consumedMessages)) { | |
return; | |
} | |
$loop->cancelTimer($timer); | |
$loop->stop(); | |
}); | |
// finish consuming | |
await($consumer); | |
// run loop | |
$loop->run(); | |
// delete test queue | |
$channel = $syncBunny->channel(); | |
$channel->queueDelete($queueName); | |
$channel->close(); | |
// cleanup timers | |
foreach ($timers as $timer) { | |
$loop->cancelTimer($timer); | |
} | |
// assert consumer consumed all messages | |
self::assertSame($messages, $consumedMessages); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment